You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							196 lines
						
					
					
						
							5.7 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							196 lines
						
					
					
						
							5.7 KiB
						
					
					
				| #!/usr/bin/env python | |
| # | |
| # Copyright 2015 clowwindy | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may | |
| # not use this file except in compliance with the License. You may obtain | |
| # a copy of the License at | |
| # | |
| #     http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
| # License for the specific language governing permissions and limitations | |
| # under the License. | |
| 
 | |
| from __future__ import absolute_import, division, print_function, \ | |
|     with_statement | |
| 
 | |
| from ctypes import c_char_p, c_int, c_long, byref,\ | |
|     create_string_buffer, c_void_p | |
| 
 | |
| from shadowsocks import common | |
| from shadowsocks.crypto import util | |
| 
 | |
| __all__ = ['ciphers'] | |
| 
 | |
| libcrypto = None | |
| loaded = False | |
| 
 | |
| buf_size = 2048 | |
| 
 | |
| 
 | |
| def load_openssl(): | |
|     global loaded, libcrypto, buf | |
| 
 | |
|     libcrypto = util.find_library(('crypto', 'eay32'), | |
|                                   'EVP_get_cipherbyname', | |
|                                   'libcrypto') | |
|     if libcrypto is None: | |
|         raise Exception('libcrypto(OpenSSL) not found') | |
| 
 | |
|     libcrypto.EVP_get_cipherbyname.restype = c_void_p | |
|     libcrypto.EVP_CIPHER_CTX_new.restype = c_void_p | |
| 
 | |
|     libcrypto.EVP_CipherInit_ex.argtypes = (c_void_p, c_void_p, c_char_p, | |
|                                             c_char_p, c_char_p, c_int) | |
| 
 | |
|     libcrypto.EVP_CipherUpdate.argtypes = (c_void_p, c_void_p, c_void_p, | |
|                                            c_char_p, c_int) | |
| 
 | |
|     libcrypto.EVP_CIPHER_CTX_cleanup.argtypes = (c_void_p,) | |
|     libcrypto.EVP_CIPHER_CTX_free.argtypes = (c_void_p,) | |
| 
 | |
|     libcrypto.RAND_bytes.restype = c_int | |
|     libcrypto.RAND_bytes.argtypes = (c_void_p, c_int) | |
| 
 | |
|     if hasattr(libcrypto, 'OpenSSL_add_all_ciphers'): | |
|         libcrypto.OpenSSL_add_all_ciphers() | |
| 
 | |
|     buf = create_string_buffer(buf_size) | |
|     loaded = True | |
| 
 | |
| 
 | |
| def load_cipher(cipher_name): | |
|     func_name = 'EVP_' + cipher_name.replace('-', '_') | |
|     if bytes != str: | |
|         func_name = str(func_name, 'utf-8') | |
|     cipher = getattr(libcrypto, func_name, None) | |
|     if cipher: | |
|         cipher.restype = c_void_p | |
|         return cipher() | |
|     return None | |
| 
 | |
| def rand_bytes(length): | |
|     if not loaded: | |
|         load_openssl() | |
|     buf = create_string_buffer(length) | |
|     r = libcrypto.RAND_bytes(buf, length) | |
|     if r <= 0: | |
|         raise Exception('RAND_bytes return error') | |
|     return buf.raw | |
| 
 | |
| class OpenSSLCrypto(object): | |
|     def __init__(self, cipher_name, key, iv, op): | |
|         self._ctx = None | |
|         if not loaded: | |
|             load_openssl() | |
|         cipher_name = common.to_bytes(cipher_name) | |
|         cipher = libcrypto.EVP_get_cipherbyname(cipher_name) | |
|         if not cipher: | |
|             cipher = load_cipher(cipher_name) | |
|         if not cipher: | |
|             raise Exception('cipher %s not found in libcrypto' % cipher_name) | |
|         key_ptr = c_char_p(key) | |
|         iv_ptr = c_char_p(iv) | |
|         self._ctx = libcrypto.EVP_CIPHER_CTX_new() | |
|         if not self._ctx: | |
|             raise Exception('can not create cipher context') | |
|         r = libcrypto.EVP_CipherInit_ex(self._ctx, cipher, None, | |
|                                         key_ptr, iv_ptr, c_int(op)) | |
|         if not r: | |
|             self.clean() | |
|             raise Exception('can not initialize cipher context') | |
| 
 | |
|     def update(self, data): | |
|         global buf_size, buf | |
|         cipher_out_len = c_long(0) | |
|         l = len(data) | |
|         if buf_size < l: | |
|             buf_size = l * 2 | |
|             buf = create_string_buffer(buf_size) | |
|         libcrypto.EVP_CipherUpdate(self._ctx, byref(buf), | |
|                                    byref(cipher_out_len), c_char_p(data), l) | |
|         # buf is copied to a str object when we access buf.raw | |
|         return buf.raw[:cipher_out_len.value] | |
| 
 | |
|     def __del__(self): | |
|         self.clean() | |
| 
 | |
|     def clean(self): | |
|         if self._ctx: | |
|             libcrypto.EVP_CIPHER_CTX_cleanup(self._ctx) | |
|             libcrypto.EVP_CIPHER_CTX_free(self._ctx) | |
| 
 | |
| 
 | |
| ciphers = { | |
|     'aes-128-cbc': (16, 16, OpenSSLCrypto), | |
|     'aes-192-cbc': (24, 16, OpenSSLCrypto), | |
|     'aes-256-cbc': (32, 16, OpenSSLCrypto), | |
|     'aes-128-cfb': (16, 16, OpenSSLCrypto), | |
|     'aes-192-cfb': (24, 16, OpenSSLCrypto), | |
|     'aes-256-cfb': (32, 16, OpenSSLCrypto), | |
|     'aes-128-ofb': (16, 16, OpenSSLCrypto), | |
|     'aes-192-ofb': (24, 16, OpenSSLCrypto), | |
|     'aes-256-ofb': (32, 16, OpenSSLCrypto), | |
|     'aes-128-ctr': (16, 16, OpenSSLCrypto), | |
|     'aes-192-ctr': (24, 16, OpenSSLCrypto), | |
|     'aes-256-ctr': (32, 16, OpenSSLCrypto), | |
|     'aes-128-cfb8': (16, 16, OpenSSLCrypto), | |
|     'aes-192-cfb8': (24, 16, OpenSSLCrypto), | |
|     'aes-256-cfb8': (32, 16, OpenSSLCrypto), | |
|     'aes-128-cfb1': (16, 16, OpenSSLCrypto), | |
|     'aes-192-cfb1': (24, 16, OpenSSLCrypto), | |
|     'aes-256-cfb1': (32, 16, OpenSSLCrypto), | |
|     'bf-cfb': (16, 8, OpenSSLCrypto), | |
|     'camellia-128-cfb': (16, 16, OpenSSLCrypto), | |
|     'camellia-192-cfb': (24, 16, OpenSSLCrypto), | |
|     'camellia-256-cfb': (32, 16, OpenSSLCrypto), | |
|     'cast5-cfb': (16, 8, OpenSSLCrypto), | |
|     'des-cfb': (8, 8, OpenSSLCrypto), | |
|     'idea-cfb': (16, 8, OpenSSLCrypto), | |
|     'rc2-cfb': (16, 8, OpenSSLCrypto), | |
|     'rc4': (16, 0, OpenSSLCrypto), | |
|     'seed-cfb': (16, 16, OpenSSLCrypto), | |
| } | |
| 
 | |
| 
 | |
| def run_method(method): | |
| 
 | |
|     cipher = OpenSSLCrypto(method, b'k' * 32, b'i' * 16, 1) | |
|     decipher = OpenSSLCrypto(method, b'k' * 32, b'i' * 16, 0) | |
| 
 | |
|     util.run_cipher(cipher, decipher) | |
| 
 | |
| 
 | |
| def test_aes_128_cfb(): | |
|     run_method('aes-128-cfb') | |
| 
 | |
| 
 | |
| def test_aes_256_cfb(): | |
|     run_method('aes-256-cfb') | |
| 
 | |
| 
 | |
| def test_aes_128_cfb8(): | |
|     run_method('aes-128-cfb8') | |
| 
 | |
| 
 | |
| def test_aes_256_ofb(): | |
|     run_method('aes-256-ofb') | |
| 
 | |
| 
 | |
| def test_aes_256_ctr(): | |
|     run_method('aes-256-ctr') | |
| 
 | |
| 
 | |
| def test_bf_cfb(): | |
|     run_method('bf-cfb') | |
| 
 | |
| 
 | |
| def test_rc4(): | |
|     run_method('rc4') | |
| 
 | |
| 
 | |
| if __name__ == '__main__': | |
|     test_aes_128_cfb()
 | |
| 
 |