From e5e3102591c0016cdb114aecb19ae5a43b1d1b44 Mon Sep 17 00:00:00 2001 From: breakwa11 Date: Mon, 10 Aug 2015 12:47:37 +0800 Subject: [PATCH] remove debug code add AddressMap for udp --- shadowsocks/udprelay.py | 80 +++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/shadowsocks/udprelay.py b/shadowsocks/udprelay.py index 018a6a6..34608b1 100644 --- a/shadowsocks/udprelay.py +++ b/shadowsocks/udprelay.py @@ -97,8 +97,9 @@ WAIT_STATUS_READWRITING = WAIT_STATUS_READING | WAIT_STATUS_WRITING BUF_SIZE = 65536 DOUBLE_SEND_BEG_IDS = 16 -POST_MTU_MIN = 1000 +POST_MTU_MIN = 500 POST_MTU_MAX = 1400 +SENDING_WINDOW_SIZE = 8192 STAGE_INIT = 0 STAGE_RSP_ID = 1 @@ -173,9 +174,6 @@ class SendingQueue(object): while self.begin_id < begin_id: self.begin_id += 1 del self.queue[self.begin_id] - #while len(self.queue) > 0 and self.queue[0][0] <= begin_id: - # del self.queue[0] - # self.begin_id += 1 class RecvQueue(object): def __init__(self): @@ -229,6 +227,38 @@ class RecvQueue(object): missing.append(i - begin_id) return (begin_id, missing) +class AddressMap(object): + def __init__(self): + self._queue = [] + self._addr_map = {} + + def add(self, addr): + if addr in self._addr_map: + self._addr_map[addr] = UDPLocalAddress(addr) + else: + self._addr_map[addr] = UDPLocalAddress(addr) + self._queue.append(addr) + + def keys(self): + return self._queue + + def get(self): + if self._queue: + while True: + if len(self._queue) == 1: + return self._queue[0] + index = random.randint(0, len(self._queue) - 1) + addr = self._queue[index] + if self._addr_map[addr].is_timeout(): + self._queue[index] = self._queue[len(self._queue) - 1] + del self._queue[len(self._queue) - 1] + del self._addr_map[addr] + else: + break + return addr + else: + return None + class TCPRelayHandler(object): def __init__(self, server, reqid_to_handlers, fd_to_handlers, loop, local_sock, local_id, client_param, config, @@ -254,7 +284,7 @@ class TCPRelayHandler(object): self._upstream_status = WAIT_STATUS_READING self._downstream_status = WAIT_STATUS_INIT self._request_id = 0 - self._client_address = {} + self._client_address = AddressMap() self._remote_address = None self._sendingqueue = SendingQueue() self._recvqueue = RecvQueue() @@ -282,7 +312,10 @@ class TCPRelayHandler(object): return self._remote_address def add_local_address(self, addr): - self._client_address[addr] = UDPLocalAddress(addr) + self._client_address.add(addr) + + def get_local_address(self): + return self._client_address.get() def _update_activity(self): # tell the TCP Relay we have activities recently @@ -367,8 +400,6 @@ class TCPRelayHandler(object): return False if uncomplete: if sock == self._local_sock: - #if data is not None and retry < 10: - # self._data_to_write_to_local.append([(data, addr), retry]) self._update_stream(STREAM_DOWN, WAIT_STATUS_WRITING) elif sock == self._remote_sock: self._data_to_write_to_remote.append(data) @@ -377,15 +408,12 @@ class TCPRelayHandler(object): logging.error('write_all_to_sock:unknown socket') else: if sock == self._local_sock: - if self._sendingqueue.size() > 8192: + if self._sendingqueue.size() > SENDING_WINDOW_SIZE: self._update_stream(STREAM_DOWN, WAIT_STATUS_WRITING) else: self._update_stream(STREAM_DOWN, WAIT_STATUS_READING) elif sock == self._remote_sock: - if self._sendingqueue.size() > 8192: - self._update_stream(STREAM_DOWN, WAIT_STATUS_WRITING) - else: - self._update_stream(STREAM_UP, WAIT_STATUS_READING) + self._update_stream(STREAM_UP, WAIT_STATUS_READING) else: logging.error('write_all_to_sock:unknown socket') return True @@ -439,9 +467,7 @@ class TCPRelayHandler(object): self._update_stream(STREAM_DOWN, WAIT_STATUS_READING) self._stage = STAGE_STREAM - for it_addr in self._client_address: - addr = it_addr - break + addr = self.get_local_address() for i in xrange(2): rsp_data = self._pack_rsp_data(CMD_RSP_CONNECT_REMOTE, "\x02") @@ -508,13 +534,11 @@ class TCPRelayHandler(object): pack_id = self._sendingqueue.append(data) post_data = self._pack_post_data(CMD_POST, pack_id, data) - for it_addr in self._client_address: - addr = it_addr - break + addr = self.get_local_address() self._write_to_sock(post_data, self._local_sock, addr) - #if pack_id <= DOUBLE_SEND_BEG_IDS: - # post_data = self._pack_post_data(CMD_POST, pack_id, data) - # self._write_to_sock(post_data, self._local_sock, addr) + if pack_id <= DOUBLE_SEND_BEG_IDS: + post_data = self._pack_post_data(CMD_POST, pack_id, data) + self._write_to_sock(post_data, self._local_sock, addr) except Exception as e: shell.print_exception(e) @@ -620,9 +644,9 @@ class TCPRelayHandler(object): for post_pack_id, post_data in send_list: rsp_data = self._pack_post_data(CMD_POST, post_pack_id, post_data) self._write_to_sock(rsp_data, self._local_sock, addr) - #if post_pack_id <= DOUBLE_SEND_BEG_IDS: - # rsp_data = self._pack_post_data(CMD_POST, post_pack_id, post_data) - # self._write_to_sock(rsp_data, self._local_sock, addr) + if post_pack_id <= DOUBLE_SEND_BEG_IDS: + rsp_data = self._pack_post_data(CMD_POST, post_pack_id, post_data) + self._write_to_sock(rsp_data, self._local_sock, addr) def handle_client(self, addr, cmd, request_id, data): self.add_local_address(addr) @@ -732,13 +756,11 @@ class TCPRelayHandler(object): pack_id = struct.unpack(">I", data[0:4])[0] max_send_id = struct.unpack(">I", data[4:8])[0] data = data[8:] - logging.info('handle_client STAGE_DESTROYED send %d %d' % (request_id, pack_id)) self.handle_stream_sync_status(addr, cmd, request_id, pack_id, max_send_id, data) elif cmd == CMD_SYN_STATUS_64: pack_id = struct.unpack(">Q", data[0:8])[0] max_send_id = struct.unpack(">Q", data[8:16])[0] data = data[16:] - logging.info('handle_client STAGE_DESTROYED send %d %d' % (request_id, pack_id)) self.handle_stream_sync_status(addr, cmd, request_id, pack_id, max_send_id, data) def handle_event(self, sock, event): @@ -810,9 +832,7 @@ class TCPRelayHandler(object): logging.debug('disconnect local') rsp_data = self._pack_rsp_data(CMD_DISCONNECT, "") addr = None - for it_addr in self._client_address: - addr = it_addr - break + addr = self.get_local_address() self._write_to_sock(rsp_data, self._local_sock, addr) self._local_sock = None del self._reqid_to_handlers[self._request_id]