From c3a96129196ef15b14c41e4ff3e5c7916f567bbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A0=B4=E5=A8=83=E9=85=B1?= Date: Tue, 22 Nov 2016 15:27:12 +0800 Subject: [PATCH] bump 2.9.7 manage client with LRUCache --- CHANGES | 8 +++++ shadowsocks/lru_cache.py | 7 ++-- shadowsocks/tcprelay.py | 74 +++++++++----------------------------- shadowsocks/udprelay.py | 76 ++++++++++------------------------------ shadowsocks/version.py | 2 +- 5 files changed, 49 insertions(+), 118 deletions(-) diff --git a/CHANGES b/CHANGES index 116c222..e865e96 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,11 @@ +2.9.7 2016-11-22 +- manage client with LRUCache +- catch bind error +- fix import error of resource on windows +- print RLIMIT_NOFILE +- always close cymysql objects +- add init script + 2.9.6 2016-10-17 - tls1.2_ticket_auth random packet size diff --git a/shadowsocks/lru_cache.py b/shadowsocks/lru_cache.py index 9017f81..d748d58 100644 --- a/shadowsocks/lru_cache.py +++ b/shadowsocks/lru_cache.py @@ -74,6 +74,9 @@ class LRUCache(collections.MutableMapping): del self._store[key] del self._keys_to_last_time[key] + def __contains__(self, key): + return key in self._store + def __iter__(self): return iter(self._store) @@ -98,10 +101,10 @@ class LRUCache(collections.MutableMapping): if now - last_t <= self.timeout: break value = self._store[key] - if self.close_callback is not None: - self.close_callback(value) del self._store[key] del self._keys_to_last_time[key] + if self.close_callback is not None: + self.close_callback(value) c += 1 if c: logging.debug('%d keys swept' % c) diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index 441931d..6678301 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -28,7 +28,7 @@ import traceback import random import platform -from shadowsocks import encrypt, obfs, eventloop, shell, common +from shadowsocks import encrypt, obfs, eventloop, shell, common, lru_cache from shadowsocks.common import pre_parse_header, parse_header # we clear at most TIMEOUTS_CLEAN_SIZE timeouts each time @@ -961,10 +961,8 @@ class TCPRelay(object): common.connect_log = logging.info self._timeout = config['timeout'] - self._timeouts = [] # a list for all the handlers - # we trim the timeouts once a while - self._timeout_offset = 0 # last checked position for timeout - self._handler_to_timeouts = {} # key: handler value: index in timeouts + self._timeout_cache = lru_cache.LRUCache(timeout=self._timeout, + close_callback=self._close_tcp_client) if is_local: listen_addr = config['local_address'] @@ -1005,12 +1003,9 @@ class TCPRelay(object): eventloop.POLL_IN | eventloop.POLL_ERR, self) self._eventloop.add_periodic(self.handle_periodic) - def remove_handler(self, handler): - index = self._handler_to_timeouts.get(hash(handler), -1) - if index >= 0: - # delete is O(n), so we just set it to None - self._timeouts[index] = None - del self._handler_to_timeouts[hash(handler)] + def remove_handler(self, client): + if hash(client) in self._timeout_cache: + del self._timeout_cache[hash(client)] def add_connection(self, val): self.server_connections += val @@ -1052,57 +1047,22 @@ class TCPRelay(object): logging.info('Total connections down to %d' % newval) self._stat_counter[-1] = self._stat_counter.get(-1, 0) - connections_step - def update_activity(self, handler, data_len): + def update_activity(self, client, data_len): if data_len and self._stat_callback: self._stat_callback(self._listen_port, data_len) - # set handler to active - now = int(time.time()) - if now - handler.last_activity < eventloop.TIMEOUT_PRECISION: - # thus we can lower timeout modification frequency - return - handler.last_activity = now - index = self._handler_to_timeouts.get(hash(handler), -1) - if index >= 0: - # delete is O(n), so we just set it to None - self._timeouts[index] = None - length = len(self._timeouts) - self._timeouts.append(handler) - self._handler_to_timeouts[hash(handler)] = length + self._timeout_cache[hash(client)] = client def _sweep_timeout(self): - # tornado's timeout memory management is more flexible than we need - # we just need a sorted last_activity queue and it's faster than heapq - # in fact we can do O(1) insertion/remove so we invent our own - if self._timeouts: - logging.log(shell.VERBOSE_LEVEL, 'sweeping timeouts') - now = time.time() - length = len(self._timeouts) - pos = self._timeout_offset - while pos < length: - handler = self._timeouts[pos] - if handler: - if now - handler.last_activity < self._timeout: - break - else: - if handler.remote_address: - logging.debug('timed out: %s:%d' % - handler.remote_address) - else: - logging.debug('timed out') - handler.destroy() - self._timeouts[pos] = None # free memory - pos += 1 - else: - pos += 1 - if pos > TIMEOUTS_CLEAN_SIZE and pos > length >> 1: - # clean up the timeout queue when it gets larger than half - # of the queue - self._timeouts = self._timeouts[pos:] - for key in self._handler_to_timeouts: - self._handler_to_timeouts[key] -= pos - pos = 0 - self._timeout_offset = pos + self._timeout_cache.sweep() + + def _close_tcp_client(self, client): + if client.remote_address: + logging.debug('timed out: %s:%d' % + client.remote_address) + else: + logging.debug('timed out') + client.destroy() def handle_event(self, sock, fd, event): # handle events and dispatch to handlers diff --git a/shadowsocks/udprelay.py b/shadowsocks/udprelay.py index 78b915d..9d62127 100644 --- a/shadowsocks/udprelay.py +++ b/shadowsocks/udprelay.py @@ -918,10 +918,8 @@ class UDPRelay(object): self._reqid_to_hd = {} self._data_to_write_to_server_socket = [] - self._timeouts = [] # a list for all the handlers - # we trim the timeouts once a while - self._timeout_offset = 0 # last checked position for timeout - self._handler_to_timeouts = {} # key: handler value: index in timeouts + self._timeout_cache = lru_cache.LRUCache(timeout=self._timeout, + close_callback=self._close_tcp_client) self._bind = config.get('out_bind', '') self._bindv6 = config.get('out_bindv6', '') @@ -1317,62 +1315,24 @@ class UDPRelay(object): eventloop.POLL_IN | eventloop.POLL_ERR, self) loop.add_periodic(self.handle_periodic) - def remove_handler(self, handler): - index = self._handler_to_timeouts.get(hash(handler), -1) - if index >= 0: - # delete is O(n), so we just set it to None - self._timeouts[index] = None - del self._handler_to_timeouts[hash(handler)] - - def update_activity(self, handler): - # set handler to active - now = int(time.time()) - if now - handler.last_activity < eventloop.TIMEOUT_PRECISION: - # thus we can lower timeout modification frequency - return - handler.last_activity = now - index = self._handler_to_timeouts.get(hash(handler), -1) - if index >= 0: - # delete is O(n), so we just set it to None - self._timeouts[index] = None - length = len(self._timeouts) - self._timeouts.append(handler) - self._handler_to_timeouts[hash(handler)] = length + def remove_handler(self, client): + if hash(client) in self._timeout_cache: + del self._timeout_cache[hash(client)] + + def update_activity(self, client): + self._timeout_cache[hash(client)] = client def _sweep_timeout(self): - # tornado's timeout memory management is more flexible than we need - # we just need a sorted last_activity queue and it's faster than heapq - # in fact we can do O(1) insertion/remove so we invent our own - if self._timeouts: - logging.log(shell.VERBOSE_LEVEL, 'sweeping timeouts') - now = time.time() - length = len(self._timeouts) - pos = self._timeout_offset - while pos < length: - handler = self._timeouts[pos] - if handler: - if now - handler.last_activity < self._timeout: - break - else: - if handler.remote_address: - logging.debug('timed out: %s:%d' % - handler.remote_address) - else: - logging.debug('timed out') - handler.destroy() - handler.destroy_local() - self._timeouts[pos] = None # free memory - pos += 1 - else: - pos += 1 - if pos > TIMEOUTS_CLEAN_SIZE and pos > length >> 1: - # clean up the timeout queue when it gets larger than half - # of the queue - self._timeouts = self._timeouts[pos:] - for key in self._handler_to_timeouts: - self._handler_to_timeouts[key] -= pos - pos = 0 - self._timeout_offset = pos + self._timeout_cache.sweep() + + def _close_tcp_client(self, client): + if client.remote_address: + logging.debug('timed out: %s:%d' % + client.remote_address) + else: + logging.debug('timed out') + client.destroy() + client.destroy_local() def handle_event(self, sock, fd, event): if sock == self._server_socket: diff --git a/shadowsocks/version.py b/shadowsocks/version.py index a2abded..d0befd0 100644 --- a/shadowsocks/version.py +++ b/shadowsocks/version.py @@ -16,5 +16,5 @@ # under the License. def version(): - return '2.9.6 2016-10-17' + return '2.9.7 2016-11-22'