diff --git a/shadowsocks/local.py b/shadowsocks/local.py index 049c8ea..66070aa 100755 --- a/shadowsocks/local.py +++ b/shadowsocks/local.py @@ -21,353 +21,40 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from __future__ import with_statement import sys -if sys.version_info < (2, 6): - import simplejson as json -else: - import json - -try: - import gevent - import gevent.monkey - gevent.monkey.patch_all(dns=gevent.version_info[0] >= 1) -except ImportError: - gevent = None - print >>sys.stderr, 'warning: gevent not found, using threading instead' - import socket -import eventloop -import errno -import select -import SocketServer -import struct import os -import random -import re import logging -import getopt import encrypt import utils +import tcprelay import udprelay -MSG_FASTOPEN = 0x20000000 - - -def send_all(sock, data): - bytes_sent = 0 - while True: - r = sock.send(data[bytes_sent:]) - if r < 0: - return r - bytes_sent += r - if bytes_sent == len(data): - return bytes_sent - - -class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - allow_reuse_address = True - - def get_request(self): - connection = self.socket.accept() - connection[0].settimeout(config_timeout) - return connection - - -class Socks5Server(SocketServer.StreamRequestHandler): - @staticmethod - def get_server(): - a_port = config_server_port - a_server = config_server - if isinstance(config_server_port, list): - # support config like "server_port": [8081, 8082] - a_port = random.choice(config_server_port) - if isinstance(config_server, list): - # support config like "server": ["123.123.123.1", "123.123.123.2"] - a_server = random.choice(config_server) - - r = re.match(r'^(.*):(\d+)$', a_server) - if r: - # support config like "server": "123.123.123.1:8381" - # or "server": ["123.123.123.1:8381", "123.123.123.2:8381"] - a_server = r.group(1) - a_port = int(r.group(2)) - return a_server, a_port - - @staticmethod - def handle_tcp(sock, remote, encryptor, pending_data=None, - server=None, port=None): - connected = False - try: - if config_fast_open: - fdset = [sock] - else: - fdset = [sock, remote] - while True: - should_break = False - r, w, e = select.select(fdset, [], [], config_timeout) - if not r: - logging.warn('read time out') - break - if sock in r: - if not connected and config_fast_open: - data = sock.recv(4096) - data = encryptor.encrypt(pending_data + data) - pending_data = None - logging.info('fast open %s:%d' % (server, port)) - try: - remote.sendto(data, MSG_FASTOPEN, (server, port)) - except (OSError, IOError) as e: - if eventloop.errno_from_exception(e) == errno.EINPROGRESS: - pass - else: - raise e - connected = True - fdset = [sock, remote] - else: - data = sock.recv(4096) - if pending_data: - data = pending_data + data - pending_data = None - data = encryptor.encrypt(data) - if len(data) <= 0: - should_break = True - else: - result = send_all(remote, data) - if result < len(data): - raise Exception('failed to send all data') - - if remote in r: - data = encryptor.decrypt(remote.recv(4096)) - if len(data) <= 0: - should_break = True - else: - result = send_all(sock, data) - if result < len(data): - raise Exception('failed to send all data') - if should_break: - # make sure all data are read before we close the sockets - # TODO: we haven't read ALL the data, actually - # http://cs.ecs.baylor.edu/~donahoo/practical/CSockets/TCPRST.pdf - break - finally: - sock.close() - remote.close() - - def handle(self): - try: - encryptor = encrypt.Encryptor(config_password, config_method) - sock = self.connection - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - data = sock.recv(262) - if not data: - sock.close() - return - if len(data) < 3: - return - method = ord(data[2]) - if method == 2: - logging.warn('client tries to use username/password auth, prete' - 'nding the password is OK') - sock.send('\x05\x02') - try: - ver_ulen = sock.recv(2) - ulen = ord(ver_ulen[1]) - if ulen: - username = sock.recv(ulen) - assert(ulen == len(username)) - plen = ord(sock.recv(1)) - if plen: - _password = sock.recv(plen) - assert(plen == len(_password)) - sock.send('\x01\x00') - except Exception as e: - logging.error(e) - return - elif method == 0: - sock.send("\x05\x00") - else: - logging.error('unsupported method %d' % method) - return - data = self.rfile.read(4) or '\x00' * 4 - mode = ord(data[1]) - if mode == 1: - pass - elif mode == 3: - # UDP - logging.debug('UDP assc request') - if sock.family == socket.AF_INET6: - header = '\x05\x00\x00\x04' - else: - header = '\x05\x00\x00\x01' - addr, port = sock.getsockname() - addr_to_send = socket.inet_pton(sock.family, addr) - port_to_send = struct.pack('>H', port) - sock.send(header + addr_to_send + port_to_send) - while True: - data = sock.recv(4096) - if not data: - break - return - else: - logging.warn('unknown mode %d' % mode) - return - addrtype = ord(data[3]) - addr_to_send = data[3] - if addrtype == 1: - addr_ip = self.rfile.read(4) - addr = socket.inet_ntoa(addr_ip) - addr_to_send += addr_ip - elif addrtype == 3: - addr_len = self.rfile.read(1) - addr = self.rfile.read(ord(addr_len)) - addr_to_send += addr_len + addr - elif addrtype == 4: - addr_ip = self.rfile.read(16) - addr = socket.inet_ntop(socket.AF_INET6, addr_ip) - addr_to_send += addr_ip - else: - logging.warn('addr_type not supported') - # not supported - return - addr_port = self.rfile.read(2) - addr_to_send += addr_port - port = struct.unpack('>H', addr_port) - try: - reply = "\x05\x00\x00\x01" - reply += socket.inet_aton('0.0.0.0') + struct.pack(">H", 2222) - self.wfile.write(reply) - # reply immediately - a_server, a_port = Socks5Server.get_server() - addrs = socket.getaddrinfo(a_server, a_port) - if addrs: - af, socktype, proto, canonname, sa = addrs[0] - if config_fast_open: - remote = socket.socket(af, socktype, proto) - remote.setsockopt(socket.IPPROTO_TCP, - socket.TCP_NODELAY, 1) - Socks5Server.handle_tcp(sock, remote, encryptor, - addr_to_send, a_server, a_port) - else: - logging.info('connecting %s:%d' % (addr, port[0])) - remote = socket.create_connection((a_server, a_port), - timeout=config_timeout) - remote.settimeout(config_timeout) - remote.setsockopt(socket.IPPROTO_TCP, - socket.TCP_NODELAY, 1) - Socks5Server.handle_tcp(sock, remote, encryptor, - addr_to_send) - except (OSError, IOError) as e: - logging.warn(e) - return - except (OSError, IOError) as e: - raise e - logging.warn(e) - - def main(): - global config_server, config_server_port, config_password, config_method,\ - config_fast_open, config_timeout - - logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', filemode='a+') + utils.check_python() # fix py2exe if hasattr(sys, "frozen") and sys.frozen in \ ("windows_exe", "console_exe"): p = os.path.dirname(os.path.abspath(sys.executable)) os.chdir(p) - version = '' - try: - import pkg_resources - version = pkg_resources.get_distribution('shadowsocks').version - except: - pass - print 'shadowsocks %s' % version - - config_password = None - config_method = None - config_path = utils.find_config() - try: - optlist, args = getopt.getopt(sys.argv[1:], 's:b:p:k:l:m:c:t:', - ['fast-open']) - for key, value in optlist: - if key == '-c': - config_path = value - - if config_path: - logging.info('loading config from %s' % config_path) - with open(config_path, 'rb') as f: - try: - config = json.load(f) - except ValueError as e: - logging.error('found an error in config.json: %s', - e.message) - sys.exit(1) - else: - config = {} - - optlist, args = getopt.getopt(sys.argv[1:], 's:b:p:k:l:m:c:t:', - ['fast-open']) - for key, value in optlist: - if key == '-p': - config['server_port'] = int(value) - elif key == '-k': - config['password'] = value - elif key == '-l': - config['local_port'] = int(value) - elif key == '-s': - config['server'] = value - elif key == '-m': - config['method'] = value - elif key == '-b': - config['local_address'] = value - elif key == '--fast-open': - config['fast_open'] = True - except getopt.GetoptError as e: - logging.error(e) - utils.print_local_help() - sys.exit(2) + utils.print_shadowsocks() - config_server = config['server'] - config_server_port = config['server_port'] - config_local_port = config['local_port'] - config_password = config['password'] - config_method = config.get('method', None) - config_local_address = config.get('local_address', '127.0.0.1') - config_timeout = int(config.get('timeout', 300)) - config_fast_open = config.get('fast_open', False) + config = utils.get_config(True) - if not config_password and not config_path: - sys.exit('config not specified, please read ' - 'https://github.com/clowwindy/shadowsocks') + encrypt.init_table(config['password'], config['method']) - utils.check_config(config) - - encrypt.init_table(config_password, config_method) - - addrs = socket.getaddrinfo(config_local_address, config_local_port) - if not addrs: - logging.error('cant resolve listen address') - sys.exit(1) - ThreadingTCPServer.address_family = addrs[0][0] try: - udprelay.UDPRelay(config_local_address, int(config_local_port), - config_server, config_server_port, config_password, - config_method, int(config_timeout), True).start() - server = ThreadingTCPServer((config_local_address, config_local_port), - Socks5Server) - server.timeout = int(config_timeout) logging.info("starting local at %s:%d" % - tuple(server.server_address[:2])) - server.serve_forever() - except socket.error, e: - logging.error(e) + (config['local_address'], config['local_port'])) + + udprelay.UDPRelay(config, True).start() + tcprelay.TCPRelay(config, True).start() + while sys.stdin.read(): + pass except KeyboardInterrupt: - server.shutdown() sys.exit(0) if __name__ == '__main__': diff --git a/shadowsocks/server.py b/shadowsocks/server.py index d7fd215..bb64bcc 100755 --- a/shadowsocks/server.py +++ b/shadowsocks/server.py @@ -21,21 +21,9 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from __future__ import with_statement import sys -if sys.version_info < (2, 6): - import simplejson as json -else: - import json - - import socket -import select -import threading -import SocketServer -import struct import logging -import getopt import encrypt import os import utils @@ -43,188 +31,12 @@ import tcprelay import udprelay -def send_all(sock, data): - bytes_sent = 0 - while True: - r = sock.send(data[bytes_sent:]) - if r < 0: - return r - bytes_sent += r - if bytes_sent == len(data): - return bytes_sent - - -class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - allow_reuse_address = True - - def server_activate(self): - if config_fast_open: - try: - self.socket.setsockopt(socket.SOL_TCP, 23, 5) - except socket.error: - logging.error('warning: fast open is not available') - self.socket.listen(self.request_queue_size) - - def get_request(self): - connection = self.socket.accept() - connection[0].settimeout(config_timeout) - return connection - - -class Socks5Server(SocketServer.StreamRequestHandler): - def handle_tcp(self, sock, remote): - try: - fdset = [sock, remote] - while True: - should_break = False - r, w, e = select.select(fdset, [], [], config_timeout) - if not r: - logging.warn('read time out') - break - if sock in r: - data = self.decrypt(sock.recv(4096)) - if len(data) <= 0: - should_break = True - else: - result = send_all(remote, data) - if result < len(data): - raise Exception('failed to send all data') - if remote in r: - data = self.encrypt(remote.recv(4096)) - if len(data) <= 0: - should_break = True - else: - result = send_all(sock, data) - if result < len(data): - raise Exception('failed to send all data') - if should_break: - # make sure all data are read before we close the sockets - # TODO: we haven't read ALL the data, actually - # http://cs.ecs.baylor.edu/~donahoo/practical/CSockets/TCPRST.pdf - break - - finally: - sock.close() - remote.close() - - def encrypt(self, data): - return self.encryptor.encrypt(data) - - def decrypt(self, data): - return self.encryptor.decrypt(data) - - def handle(self): - try: - self.encryptor = encrypt.Encryptor(self.server.key, - self.server.method) - sock = self.connection - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - iv_len = self.encryptor.iv_len() - data = sock.recv(iv_len) - if iv_len > 0 and not data: - sock.close() - return - if iv_len: - self.decrypt(data) - data = sock.recv(1) - if not data: - sock.close() - return - addrtype = ord(self.decrypt(data)) - if addrtype == 1: - addr = socket.inet_ntoa(self.decrypt(self.rfile.read(4))) - elif addrtype == 3: - addr = self.decrypt( - self.rfile.read(ord(self.decrypt(sock.recv(1))))) - elif addrtype == 4: - addr = socket.inet_ntop(socket.AF_INET6, - self.decrypt(self.rfile.read(16))) - else: - # not supported - logging.warn('addr_type not supported, maybe wrong password') - return - port = struct.unpack('>H', self.decrypt(self.rfile.read(2))) - try: - logging.info('connecting %s:%d' % (addr, port[0])) - remote = socket.create_connection((addr, port[0]), - timeout=config_timeout) - remote.settimeout(config_timeout) - remote.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - except socket.error, e: - # Connection refused - logging.warn(e) - return - self.handle_tcp(sock, remote) - except socket.error, e: - logging.warn(e) - - def main(): - global config_server, config_server_port, config_method, config_fast_open, \ - config_timeout - - logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', filemode='a+') - - - version = '' - try: - import pkg_resources - version = pkg_resources.get_distribution('shadowsocks').version - except: - pass - print 'shadowsocks %s' % version - - config_path = utils.find_config() - try: - optlist, args = getopt.getopt(sys.argv[1:], 's:p:k:m:c:t:', - ['fast-open', 'workers:']) - for key, value in optlist: - if key == '-c': - config_path = value - - if config_path: - logging.info('loading config from %s' % config_path) - with open(config_path, 'rb') as f: - try: - config = json.load(f) - except ValueError as e: - logging.error('found an error in config.json: %s', - e.message) - sys.exit(1) - else: - config = {} - - optlist, args = getopt.getopt(sys.argv[1:], 's:p:k:m:c:t:', - ['fast-open', 'workers=']) - for key, value in optlist: - if key == '-p': - config['server_port'] = int(value) - elif key == '-k': - config['password'] = value - elif key == '-s': - config['server'] = value - elif key == '-m': - config['method'] = value - elif key == '-t': - config['timeout'] = value - elif key == '--fast-open': - config['fast_open'] = True - elif key == '--workers': - config['workers'] = value - except getopt.GetoptError: - utils.print_server_help() - sys.exit(2) + utils.check_python() - config['password'] = config.get('password', None) - config['method'] = config.get('method', None) - config['port_password'] = config.get('port_password', None) - config['timeout'] = int(config.get('timeout', 300)) - config['fast_open'] = config.get('fast_open', False) - config['workers'] = config.get('workers', 1) + utils.print_shadowsocks() - utils.check_config(config) + config = utils.get_config(True) if config['port_password']: if config['server_port'] or config['password']: @@ -236,11 +48,6 @@ def main(): config['port_password'][str(config['server_port'])] = config['password'] encrypt.init_table(config['password'], config['method']) - addrs = socket.getaddrinfo(config['server'], int(8387)) - if not addrs: - logging.error('cant resolve listen address') - sys.exit(1) - ThreadingTCPServer.address_family = addrs[0][0] tcp_servers = [] udp_servers = [] for port, password in config['port_password'].items(): diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index 88abf30..335849b 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -76,7 +76,7 @@ class TCPRelayHandler(object): self._downstream_status = STATUS_WAIT_READING fd_to_handlers[local_sock.fileno()] = self local_sock.setblocking(False) - loop.add(local_sock, eventloop.POLL_IN) + loop.add(local_sock, eventloop.POLL_IN | eventloop.POLL_ERR) def update_stream(self, stream, status): dirty = False @@ -186,7 +186,8 @@ class TCPRelayHandler(object): self._remote_sock.setblocking(False) # TODO support TCP fast open self._remote_sock.connect(sa) - self._loop.add(self._remote_sock, eventloop.POLL_OUT) + self._loop.add(self._remote_sock, + eventloop.POLL_ERR | eventloop.POLL_OUT) if len(data) > header_length: self._data_to_write_to_remote.append(data[header_length:]) @@ -301,7 +302,8 @@ class TCPRelay(object): def _run(self): server_socket = self._server_socket self._eventloop = eventloop.EventLoop() - self._eventloop.add(server_socket, eventloop.POLL_IN) + self._eventloop.add(server_socket, + eventloop.POLL_IN | eventloop.POLL_ERR) last_time = time.time() while not self._closed: try: @@ -315,6 +317,9 @@ class TCPRelay(object): continue for sock, event in events: if sock == self._server_socket: + if event & eventloop.POLL_ERR: + # TODO + raise Exception('server_socket error') try: conn = self._server_socket.accept() TCPRelayHandler(self._fd_to_handlers, self._eventloop, diff --git a/shadowsocks/udprelay.py b/shadowsocks/udprelay.py index 8950de4..68414e8 100644 --- a/shadowsocks/udprelay.py +++ b/shadowsocks/udprelay.py @@ -86,7 +86,7 @@ def client_key(a, b, c, d): class UDPRelay(object): - def __init__(self, config, is_local=True): + def __init__(self, config, is_local): if is_local: self._listen_addr = config['local_address'] self._listen_port = config['local_port'] diff --git a/shadowsocks/utils.py b/shadowsocks/utils.py index 05e45b4..db19bec 100644 --- a/shadowsocks/utils.py +++ b/shadowsocks/utils.py @@ -22,9 +22,29 @@ # SOFTWARE. import os +import json +import sys +import getopt import logging +def check_python(): + info = sys.version_info + if not (info.major == 2 and info.minor >= 6): + print 'Python 2.6 or 2.7 required' + sys.exit(1) + + +def print_shadowsocks(): + version = '' + try: + import pkg_resources + version = pkg_resources.get_distribution('shadowsocks').version + except Exception: + pass + print 'shadowsocks %s' % version + + def find_config(): config_path = 'config.json' if os.path.exists(config_path): @@ -36,6 +56,12 @@ def find_config(): def check_config(config): + config['password'] = config.get('password', None) + config['method'] = config.get('method', None) + config['port_password'] = config.get('port_password', None) + config['timeout'] = int(config.get('timeout', 300)) + config['fast_open'] = config.get('fast_open', False) + config['workers'] = config.get('workers', 1) if config.get('local_address', '') in ['0.0.0.0']: logging.warn('warning: local set to listen 0.0.0.0, which is not safe') if config.get('server', '') in ['127.0.0.1', 'localhost']: @@ -52,6 +78,75 @@ def check_config(config): int(config.get('timeout'))) +def get_config(is_local): + if is_local: + shortopts = 's:b:p:k:l:m:c:t:v' + longopts = ['fast-open'] + else: + shortopts = 's:p:k:m:c:t:' + longopts = ['fast-open', 'workers:'] + try: + config_path = find_config() + optlist, args = getopt.getopt(sys.argv[1:], shortopts, longopts) + for key, value in optlist: + if key == '-c': + config_path = value + + if config_path: + logging.info('loading config from %s' % config_path) + with open(config_path, 'rb') as f: + try: + config = json.load(f) + except ValueError as e: + logging.error('found an error in config.json: %s', + e.message) + sys.exit(1) + else: + config = {} + + optlist, args = getopt.getopt(sys.argv[1:], shortopts, longopts) + for key, value in optlist: + if key == '-p': + config['server_port'] = int(value) + elif key == '-k': + config['password'] = value + elif key == '-l': + config['local_port'] = int(value) + elif key == '-s': + config['server'] = value + elif key == '-m': + config['method'] = value + elif key == '-b': + config['local_address'] = value + elif key == '-v': + config['verbose'] = True + elif key == '--fast-open': + config['fast_open'] = True + elif key == '--workers': + config['workers'] = value + except getopt.GetoptError as e: + logging.error(e) + if is_local: + print_local_help() + else: + print_server_help() + sys.exit(2) + + if not config['password'] and not config_path: + sys.exit('config not specified, please read ' + 'https://github.com/clowwindy/shadowsocks') + + check_config(config) + + if config['verbose']: + level = logging.DEBUG + else: + level = logging.WARNING + logging.basicConfig(level=level, + format='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', filemode='a+') + + def print_local_help(): print '''usage: sslocal [-h] -s SERVER_ADDR -p SERVER_PORT [-b LOCAL_ADDR] -l LOCAL_PORT -k PASSWORD -m METHOD [-t TIMEOUT] [-c CONFIG]