diff --git a/CHANGES b/CHANGES index ef1acb2..a58348a 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,6 @@ +2.6.13 2015-11-02 +- add protocol setting + 2.6.12 2015-10-27 - IPv6 first - Fix mem leaks diff --git a/shadowsocks/asyncdns.py b/shadowsocks/asyncdns.py index e64895c..d958752 100644 --- a/shadowsocks/asyncdns.py +++ b/shadowsocks/asyncdns.py @@ -83,9 +83,11 @@ def detect_ipv6_supprot(): s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) try: s.connect(('ipv6.google.com', 0)) + print('IPv6 support') return True except: pass + print('IPv6 not support') return False IPV6_CONNECTION_SUPPORT = detect_ipv6_supprot() @@ -356,19 +358,34 @@ class DNSResolver(object): answer[2] == QCLASS_IN: ip = answer[0] break - if not ip and self._hostname_status.get(hostname, STATUS_IPV4) \ - == STATUS_IPV6: - self._hostname_status[hostname] = STATUS_IPV4 - self._send_req(hostname, QTYPE_A) + if IPV6_CONNECTION_SUPPORT: + if not ip and self._hostname_status.get(hostname, STATUS_IPV4) \ + == STATUS_IPV6: + self._hostname_status[hostname] = STATUS_IPV4 + self._send_req(hostname, QTYPE_A) + else: + if ip: + self._cache[hostname] = ip + self._call_callback(hostname, ip) + elif self._hostname_status.get(hostname, None) == STATUS_IPV4: + for question in response.questions: + if question[1] == QTYPE_A: + self._call_callback(hostname, None) + break else: - if ip: - self._cache[hostname] = ip - self._call_callback(hostname, ip) - elif self._hostname_status.get(hostname, None) == STATUS_IPV4: - for question in response.questions: - if question[1] == QTYPE_A: - self._call_callback(hostname, None) - break + if not ip and self._hostname_status.get(hostname, STATUS_IPV6) \ + == STATUS_IPV4: + self._hostname_status[hostname] = STATUS_IPV6 + self._send_req(hostname, QTYPE_AAAA) + else: + if ip: + self._cache[hostname] = ip + self._call_callback(hostname, ip) + elif self._hostname_status.get(hostname, None) == STATUS_IPV6: + for question in response.questions: + if question[1] == QTYPE_AAAA: + self._call_callback(hostname, None) + break def handle_event(self, sock, fd, event): if sock != self._sock: diff --git a/shadowsocks/local.py b/shadowsocks/local.py index c6df20c..096283c 100755 --- a/shadowsocks/local.py +++ b/shadowsocks/local.py @@ -43,6 +43,9 @@ def main(): config = shell.get_config(True) + if not config.get('dns_ipv6', False): + asyncdns.IPV6_CONNECTION_SUPPORT = False + daemon.daemon_exec(config) try: diff --git a/shadowsocks/obfs.py b/shadowsocks/obfs.py index 92de548..1752a56 100644 --- a/shadowsocks/obfs.py +++ b/shadowsocks/obfs.py @@ -83,3 +83,7 @@ class obfs(object): def server_post_decrypt(self, buf): return self.obfs.server_post_decrypt(buf) + def dispose(self): + self.obfs.dispose() + del self.obfs + diff --git a/shadowsocks/obfsplugin/http_simple.py b/shadowsocks/obfsplugin/http_simple.py index efd7f2a..8144aa8 100644 --- a/shadowsocks/obfsplugin/http_simple.py +++ b/shadowsocks/obfsplugin/http_simple.py @@ -192,6 +192,9 @@ class http2_simple(plain.plain): return buf self.send_buffer += buf if not self.has_sent_header: + port = b'' + if self.server_info.port != 80: + port = b':' + common.to_bytes(str(self.server_info.port)) self.has_sent_header = True http_head = b"GET / HTTP/1.1\r\n" http_head += b"Host: " + (self.server_info.param or self.server_info.host) + port + b"\r\n" diff --git a/shadowsocks/obfsplugin/plain.py b/shadowsocks/obfsplugin/plain.py index cc602fa..5450e7a 100644 --- a/shadowsocks/obfsplugin/plain.py +++ b/shadowsocks/obfsplugin/plain.py @@ -27,6 +27,7 @@ def create_obfs(method): obfs_map = { 'plain': (create_obfs,), + 'origin': (create_obfs,), } class plain(object): @@ -66,3 +67,6 @@ class plain(object): def server_post_decrypt(self, buf): return buf + def dispose(self): + pass + diff --git a/shadowsocks/obfsplugin/verify_simple.py b/shadowsocks/obfsplugin/verify_simple.py index 6b90a5a..8945d96 100644 --- a/shadowsocks/obfsplugin/verify_simple.py +++ b/shadowsocks/obfsplugin/verify_simple.py @@ -57,59 +57,42 @@ def match_begin(str1, str2): class obfs_verify_data(object): def __init__(self): - self.sub_obfs = None + pass class verify_base(plain.plain): def __init__(self, method): super(verify_base, self).__init__(method) self.method = method - self.sub_obfs = None def init_data(self): return obfs_verify_data() def set_server_info(self, server_info): - try: - if server_info.param: - sub_param = '' - param_list = server_info.param.split(',', 1) - if len(param_list) > 1: - self.sub_obfs = shadowsocks.obfs.obfs(param_list[0]) - sub_param = param_list[1] - else: - self.sub_obfs = shadowsocks.obfs.obfs(server_info.param) - if server_info.data.sub_obfs is None: - server_info.data.sub_obfs = self.sub_obfs.init_data() - _server_info = shadowsocks.obfs.server_info(server_info.data.sub_obfs) - _server_info.host = server_info.host - _server_info.port = server_info.port - _server_info.tcp_mss = server_info.tcp_mss - _server_info.param = sub_param - self.sub_obfs.set_server_info(_server_info) - except Exception as e: - shadowsocks.shell.print_exception(e) self.server_info = server_info def client_encode(self, buf): - if self.sub_obfs is not None: - return self.sub_obfs.client_encode(buf) return buf def client_decode(self, buf): - if self.sub_obfs is not None: - return self.sub_obfs.client_decode(buf) return (buf, False) def server_encode(self, buf): - if self.sub_obfs is not None: - return self.sub_obfs.server_encode(buf) return buf def server_decode(self, buf): - if self.sub_obfs is not None: - return self.sub_obfs.server_decode(buf) return (buf, True, False) + def get_head_size(self, buf, def_value): + if len(buf) < 2: + return def_value + if ord(buf[0]) == 1: + return 7 + if ord(buf[0]) == 4: + return 19 + if ord(buf[0]) == 3: + return 4 + ord(buf[1]) + return def_value + class verify_simple(verify_base): def __init__(self, method): super(verify_simple, self).__init__(method) @@ -336,28 +319,28 @@ class client_queue(object): class obfs_auth_data(object): def __init__(self): - self.sub_obfs = None self.client_id = {} self.startup_time = int(time.time() - 30) & 0xFFFFFFFF self.local_client_id = b'' self.connection_id = 0 + self.max_client = 16 # max active client count + self.max_buffer = max(self.max_client, 256) # max client id buffer size def update(self, client_id, connection_id): if client_id in self.client_id: self.client_id[client_id].update() def insert(self, client_id, connection_id): - max_client = 16 if client_id not in self.client_id or not self.client_id[client_id].enable: active = 0 for c_id in self.client_id: if self.client_id[c_id].is_active(): active += 1 - if active >= max_client: + if active >= self.max_client: logging.warn('auth_simple: max active clients exceeded') return False - if len(self.client_id) < max_client: + if len(self.client_id) < self.max_client: if client_id not in self.client_id: self.client_id[client_id] = client_queue(connection_id) else: @@ -367,7 +350,7 @@ class obfs_auth_data(object): random.shuffle(keys) for c_id in keys: if not self.client_id[c_id].is_active() and self.client_id[c_id].enable: - if len(self.client_id) >= 256: + if len(self.client_id) >= self.max_buffer: del self.client_id[c_id] else: self.client_id[c_id].enable = False @@ -392,6 +375,7 @@ class auth_simple(verify_base): self.has_recv_header = False self.client_id = 0 self.connection_id = 0 + self.max_time_dif = 60 * 5 # time dif (second) setting def init_data(self): return obfs_auth_data() @@ -422,7 +406,8 @@ class auth_simple(verify_base): def client_pre_encrypt(self, buf): ret = b'' if not self.has_sent_header: - datalen = min(len(buf), common.ord(os.urandom(1)[0]) % 32 + 4) + head_size = self.get_head_size(buf, 30) + datalen = min(len(buf), random.randint(0, 31) + head_size) ret += self.pack_data(self.auth_data() + buf[:datalen]) buf = buf[datalen:] self.has_sent_header = True @@ -512,7 +497,8 @@ class auth_simple(verify_base): client_id = struct.unpack(' 60 * 3 or common.int32(utc_time - self.server_info.data.startup_time) < 0: + if time_dif < -self.max_time_dif or time_dif > self.max_time_dif \ + or common.int32(utc_time - self.server_info.data.startup_time) < 0: self.raw_trans = True self.recv_buf = b'' logging.info('auth_simple: wrong timestamp, time_dif %d, data %s' % (time_dif, binascii.hexlify(out_buf),)) diff --git a/shadowsocks/server.py b/shadowsocks/server.py index 68d187f..4c19474 100755 --- a/shadowsocks/server.py +++ b/shadowsocks/server.py @@ -54,6 +54,9 @@ def main(): else: config['port_password'][str(server_port)] = config['password'] + if not config.get('dns_ipv6', False): + asyncdns.IPV6_CONNECTION_SUPPORT = False + if config.get('manager_address', 0): logging.info('entering manager mode') manager.run(config) @@ -65,23 +68,32 @@ def main(): port_password = config['port_password'] del config['port_password'] for port, password_obfs in port_password.items(): + protocol = config.get("protocol", 'origin') + obfs_param = config.get("obfs_param", '') if type(password_obfs) == list: password = password_obfs[0] obfs = password_obfs[1] + elif type(password_obfs) == dict: + password = password_obfs.get('password', 'm') + protocol = password_obfs.get('protocol', 'origin') + obfs = password_obfs.get('obfs', 'plain') + obfs_param = password_obfs.get('obfs_param', '') else: password = password_obfs obfs = config["obfs"] a_config = config.copy() ipv6_ok = False - logging.info("server start with password [%s] method [%s] obfs [%s] obfs_param [%s]" % - (password, a_config['method'], obfs, a_config['obfs_param'])) + logging.info("server start with protocol[%s] password [%s] method [%s] obfs [%s] obfs_param [%s]" % + (protocol, password, a_config['method'], obfs, obfs_param)) if 'server_ipv6' in a_config: try: if len(a_config['server_ipv6']) > 2 and a_config['server_ipv6'][0] == "[" and a_config['server_ipv6'][-1] == "]": a_config['server_ipv6'] = a_config['server_ipv6'][1:-1] a_config['server_port'] = int(port) a_config['password'] = password + a_config['protocol'] = protocol a_config['obfs'] = obfs + a_config['obfs_param'] = obfs_param a_config['server'] = a_config['server_ipv6'] logging.info("starting server at [%s]:%d" % (a_config['server'], int(port))) @@ -96,7 +108,9 @@ def main(): a_config = config.copy() a_config['server_port'] = int(port) a_config['password'] = password + a_config['protocol'] = protocol a_config['obfs'] = obfs + a_config['obfs_param'] = obfs_param logging.info("starting server at %s:%d" % (a_config['server'], int(port))) tcp_servers.append(tcprelay.TCPRelay(a_config, dns_resolver, False)) diff --git a/shadowsocks/shell.py b/shadowsocks/shell.py index 66e38bb..38d2432 100644 --- a/shadowsocks/shell.py +++ b/shadowsocks/shell.py @@ -218,6 +218,7 @@ def get_config(is_local): config['password'] = to_bytes(config.get('password', b'')) config['method'] = to_str(config.get('method', 'aes-256-cfb')) + config['protocol'] = to_str(config.get('protocol', 'origin')) config['obfs'] = to_str(config.get('obfs', 'plain')) config['obfs_param'] = to_str(config.get('obfs_param', '')) config['port_password'] = config.get('port_password', None) diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index cce6df5..395ecbf 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -122,6 +122,14 @@ class TCPRelayHandler(object): server_info.param = config['obfs_param'] self._obfs.set_server_info(server_info) + self._protocol = obfs.obfs(config['protocol']) + server_info = obfs.server_info(server.protocol_data) + server_info.host = config['server'] + server_info.port = server._listen_port + server_info.tcp_mss = 1440 + server_info.param = '' + self._protocol.set_server_info(server_info) + self._fastopen_connected = False self._data_to_write_to_local = [] self._data_to_write_to_remote = [] @@ -330,7 +338,7 @@ class TCPRelayHandler(object): def _handle_stage_connecting(self, data): if self._is_local: - data = self._obfs.client_pre_encrypt(data) + data = self._protocol.client_pre_encrypt(data) data = self._encryptor.encrypt(data) data = self._obfs.client_encode(data) if data: @@ -428,7 +436,7 @@ class TCPRelayHandler(object): data = b'\x88' + struct.pack('>H', total_len) + chr(rnd_len) + (b' ' * (rnd_len - 1)) + data crc = (0xffffffff - binascii.crc32(data)) & 0xffffffff data += struct.pack(' self.end_id: eid = self.end_id - while eid < pack_id: + while eid < end_id: self.miss_queue.add(eid) eid += 1 self.end_id = end_id @@ -465,9 +465,11 @@ class TCPRelayHandler(object): try: remote_sock.connect((remote_addr, remote_port)) except (OSError, IOError) as e: - if eventloop.errno_from_exception(e) == \ - errno.EINPROGRESS: - pass + if eventloop.errno_from_exception(e) in (errno.EINPROGRESS, + errno.EWOULDBLOCK): + pass # always goto here + else: + raise e self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT, @@ -623,6 +625,7 @@ class TCPRelayHandler(object): for pid in missing: data += struct.pack(">H", pid) rsp_data = self._pack_post_data(CMD_SYN_STATUS, pack_id, data) + addr = self.get_local_address() self._write_to_sock(rsp_data, self._local_sock, addr) def handle_stream_sync_status(self, addr, cmd, request_id, pack_id, max_send_id, data): @@ -829,7 +832,10 @@ class TCPRelayHandler(object): if self._remote_sock: logging.debug('destroying remote') self._loop.remove(self._remote_sock) - del self._fd_to_handlers[self._remote_sock.fileno()] + try: + del self._fd_to_handlers[self._remote_sock.fileno()] + except Exception as e: + pass self._remote_sock.close() self._remote_sock = None if self._sendingqueue.empty(): @@ -844,7 +850,11 @@ class TCPRelayHandler(object): addr = self.get_local_address() self._write_to_sock(rsp_data, self._local_sock, addr) self._local_sock = None - del self._reqid_to_handlers[self._request_id] + try: + del self._reqid_to_handlers[self._request_id] + except Exception as e: + pass + self._server.remove_handler(self) def client_key(source_addr, server_af): @@ -963,9 +973,14 @@ class UDPRelay(object): reqid_str = struct.pack(">H", request_id) return b''.join([CMD_VER_STR, common.chr(cmd), reqid_str, data, _rand_data[:random.randint(0, len(_rand_data))], reqid_str]) + def _handel_protocol_error(self, client_address, ogn_data): + #raise Exception('can not parse header') + logging.warn("Protocol ERROR, UDP ogn data %s from %s:%d" % (binascii.hexlify(ogn_data), client_address[0], client_address[1])) + def _handle_server(self): server = self._server_socket data, r_addr = server.recvfrom(BUF_SIZE) + ogn_data = data if not data: logging.debug('UDP handle_server: data is empty') if self._stat_callback: @@ -1056,8 +1071,14 @@ class UDPRelay(object): logging.error(trace) return - header_result = parse_header(data) + try: + header_result = parse_header(data) + except: + self._handel_protocol_error(r_addr, ogn_data) + return + if header_result is None: + self._handel_protocol_error(r_addr, ogn_data) return connecttype, dest_addr, dest_port, header_length = header_result