Python port of ShadowsocksR
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

222 lines
6.5 KiB

#!/usr/bin/env python
#
# Copyright 2015 clowwindy
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function, \
with_statement
from ctypes import c_char_p, c_int, c_long, byref, \
create_string_buffer, c_void_p
from shadowsocks import common
from shadowsocks.crypto import util
__all__ = ['ciphers']
libcrypto = None
loaded = False
buf_size = 2048
ctx_cleanup = None
def load_openssl():
global loaded, libcrypto, buf, ctx_cleanup
libcrypto = util.find_library(('crypto', 'eay32'),
'EVP_get_cipherbyname',
'libcrypto')
if libcrypto is None:
raise Exception('libcrypto(OpenSSL) not found')
libcrypto.EVP_get_cipherbyname.restype = c_void_p
libcrypto.EVP_CIPHER_CTX_new.restype = c_void_p
libcrypto.EVP_CipherInit_ex.argtypes = (c_void_p, c_void_p, c_char_p,
c_char_p, c_char_p, c_int)
libcrypto.EVP_CipherUpdate.argtypes = (c_void_p, c_void_p, c_void_p,
c_char_p, c_int)
if hasattr(libcrypto, "EVP_CIPHER_CTX_cleanup"):
libcrypto.EVP_CIPHER_CTX_cleanup.argtypes = (c_void_p,)
ctx_cleanup = libcrypto.EVP_CIPHER_CTX_cleanup
else:
libcrypto.EVP_CIPHER_CTX_reset.argtypes = (c_void_p,)
ctx_cleanup = libcrypto.EVP_CIPHER_CTX_reset
libcrypto.EVP_CIPHER_CTX_free.argtypes = (c_void_p,)
libcrypto.RAND_bytes.restype = c_int
libcrypto.RAND_bytes.argtypes = (c_void_p, c_int)
if hasattr(libcrypto, 'OpenSSL_add_all_ciphers'):
libcrypto.OpenSSL_add_all_ciphers()
buf = create_string_buffer(buf_size)
loaded = True
def load_cipher(cipher_name):
func_name = 'EVP_' + cipher_name.replace('-', '_')
cipher = getattr(libcrypto, func_name, None)
if cipher:
cipher.restype = c_void_p
return cipher()
return None
def rand_bytes(length):
if not loaded:
load_openssl()
buf = create_string_buffer(length)
r = libcrypto.RAND_bytes(buf, length)
if r <= 0:
raise Exception('RAND_bytes return error')
return buf.raw
class OpenSSLCrypto(object):
def __init__(self, cipher_name, key, iv, op):
self._ctx = None
if not loaded:
load_openssl()
cipher = libcrypto.EVP_get_cipherbyname(common.to_bytes(cipher_name))
if not cipher:
cipher = load_cipher(cipher_name)
if not cipher:
raise Exception('cipher %s not found in libcrypto' % cipher_name)
key_ptr = c_char_p(key)
iv_ptr = c_char_p(iv)
self._ctx = libcrypto.EVP_CIPHER_CTX_new()
if not self._ctx:
raise Exception('can not create cipher context')
r = libcrypto.EVP_CipherInit_ex(self._ctx, cipher, None,
key_ptr, iv_ptr, c_int(op))
if not r:
self.clean()
raise Exception('can not initialize cipher context')
def update(self, data):
global buf_size, buf
cipher_out_len = c_long(0)
l = len(data)
if buf_size < l:
buf_size = l * 2
buf = create_string_buffer(buf_size)
libcrypto.EVP_CipherUpdate(self._ctx, byref(buf),
byref(cipher_out_len), c_char_p(data), l)
# buf is copied to a str object when we access buf.raw
return buf.raw[:cipher_out_len.value]
def __del__(self):
self.clean()
def clean(self):
if self._ctx:
ctx_cleanup(self._ctx)
libcrypto.EVP_CIPHER_CTX_free(self._ctx)
self._ctx = None
ciphers = {
# CBC mode need a special use way that different from other.
# CBC mode encrypt message with 16n length, and need 16n+1 length space to decrypt it , otherwise don't decrypt it
'aes-128-cbc': (16, 16, OpenSSLCrypto),
'aes-192-cbc': (24, 16, OpenSSLCrypto),
'aes-256-cbc': (32, 16, OpenSSLCrypto),
'aes-128-gcm': (16, 16, OpenSSLCrypto),
'aes-192-gcm': (24, 16, OpenSSLCrypto),
'aes-256-gcm': (32, 16, OpenSSLCrypto),
'aes-128-cfb': (16, 16, OpenSSLCrypto),
'aes-192-cfb': (24, 16, OpenSSLCrypto),
'aes-256-cfb': (32, 16, OpenSSLCrypto),
'aes-128-ofb': (16, 16, OpenSSLCrypto),
'aes-192-ofb': (24, 16, OpenSSLCrypto),
'aes-256-ofb': (32, 16, OpenSSLCrypto),
'aes-128-ctr': (16, 16, OpenSSLCrypto),
'aes-192-ctr': (24, 16, OpenSSLCrypto),
'aes-256-ctr': (32, 16, OpenSSLCrypto),
'aes-128-cfb8': (16, 16, OpenSSLCrypto),
'aes-192-cfb8': (24, 16, OpenSSLCrypto),
'aes-256-cfb8': (32, 16, OpenSSLCrypto),
'aes-128-cfb1': (16, 16, OpenSSLCrypto),
'aes-192-cfb1': (24, 16, OpenSSLCrypto),
'aes-256-cfb1': (32, 16, OpenSSLCrypto),
'bf-cfb': (16, 8, OpenSSLCrypto),
'camellia-128-cfb': (16, 16, OpenSSLCrypto),
'camellia-192-cfb': (24, 16, OpenSSLCrypto),
'camellia-256-cfb': (32, 16, OpenSSLCrypto),
'cast5-cfb': (16, 8, OpenSSLCrypto),
'des-cfb': (8, 8, OpenSSLCrypto),
'idea-cfb': (16, 8, OpenSSLCrypto),
'rc2-cfb': (16, 8, OpenSSLCrypto),
'rc4': (16, 0, OpenSSLCrypto),
'seed-cfb': (16, 16, OpenSSLCrypto),
}
def run_method(method):
cipher = OpenSSLCrypto(method, b'k' * 32, b'i' * 16, 1)
decipher = OpenSSLCrypto(method, b'k' * 32, b'i' * 16, 0)
util.run_cipher(cipher, decipher)
def test_aes_128_cfb():
run_method('aes-128-cfb')
def test_aes_256_cfb():
run_method('aes-256-cfb')
def test_aes_128_cfb8():
run_method('aes-128-cfb8')
def test_aes_256_ofb():
run_method('aes-256-ofb')
def test_aes_256_ctr():
run_method('aes-256-ctr')
def test_bf_cfb():
run_method('bf-cfb')
def test_rc4():
run_method('rc4')
def test_all():
for k, v in ciphers.items():
print(k)
try:
run_method(k)
except AssertionError as e:
eprint("AssertionError===========" + k)
eprint(e)
def eprint(*args, **kwargs):
import sys
print(*args, file=sys.stderr, **kwargs)
if __name__ == '__main__':
test_all()