diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index e55f9dd..0c4f5de 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -35,35 +35,57 @@ import random from shadowsocks import encrypt, eventloop, utils, common from shadowsocks.common import parse_header - +# we clear at most TIMEOUTS_CLEAN_SIZE timeouts each time TIMEOUTS_CLEAN_SIZE = 512 + +# we check timeouts every TIMEOUT_PRECISION seconds TIMEOUT_PRECISION = 4 MSG_FASTOPEN = 0x20000000 +# SOCKS CMD defination CMD_CONNECT = 1 CMD_BIND = 2 CMD_UDP_ASSOCIATE = 3 +# TCP Relay can be either sslocal or ssserver +# for sslocal it is called is_local=True + +# for each opening port, we have a TCP Relay +# for each connection, we have a TCP Relay Handler to handle the connection + +# for each handler, we have 2 sockets: +# local: connected to the client +# remote: connected to remote server + +# for each handler, we have 2 streams: +# upstream: from client to server direction +# read local and write to remote +# downstream: from server to client direction +# read remote and write to local + +# for each handler, it could be at one of several stages: + # local: -# stage 0 init -# stage 1 hello received, hello sent +# stage 0 SOCKS hello received from local, send hello to local +# stage 1 addr received from local, query DNS for remote # stage 2 UDP assoc -# stage 3 DNS -# stage 4 addr received, reply sent -# stage 5 remote connected +# stage 3 DNS resolved, connect to remote +# stage 4 still connecting, more data from local received +# stage 5 remote connected, piping local and remote # remote: -# stage 0 init -# stage 3 DNS -# stage 4 addr received, reply sent -# stage 5 remote connected +# stage 0 just jump to stage 1 +# stage 1 addr received from local, query DNS for remote +# stage 3 DNS resolved, connect to remote +# stage 4 still connecting, more data from local received +# stage 5 remote connected, piping local and remote STAGE_INIT = 0 -STAGE_HELLO = 1 +STAGE_ADDR = 1 STAGE_UDP_ASSOC = 2 STAGE_DNS = 3 -STAGE_REPLY = 4 +STAGE_CONNECTING = 4 STAGE_STREAM = 5 STAGE_DESTROYED = -1 @@ -71,7 +93,7 @@ STAGE_DESTROYED = -1 STREAM_UP = 0 STREAM_DOWN = 1 -# stream wait status +# stream wait status, indicating it's waiting for reading, etc WAIT_STATUS_INIT = 0 WAIT_STATUS_READING = 1 WAIT_STATUS_WRITING = 2 @@ -128,9 +150,15 @@ class TCPRelayHandler(object): return server, server_port def _update_activity(self): + # tell the TCP Relay we have activities recently + # else it will think we are inactive and timed out self._server.update_activity(self) def _update_stream(self, stream, status): + # update a stream to a new waiting status + + # check if status is changed + # only update if dirty dirty = False if stream == STREAM_DOWN: if self._downstream_status != status: @@ -157,6 +185,9 @@ class TCPRelayHandler(object): self._loop.modify(self._remote_sock, event) def _write_to_sock(self, data, sock): + # write data to sock + # if only some of the data are written, put remaining in the buffer + # and update the stream to wait for writing if not data or not sock: return False uncomplete = False @@ -195,13 +226,16 @@ class TCPRelayHandler(object): logging.error('write_all_to_sock:unknown socket') return True - def _handle_stage_reply(self, data): + def _handle_stage_connecting(self, data): if self._is_local: data = self._encryptor.encrypt(data) self._data_to_write_to_remote.append(data) if self._is_local and not self._fastopen_connected and \ self._config['fast_open']: + # for sslocal and fastopen, we basically wait for data and use + # sendto to connect try: + # only connect once self._fastopen_connected = True remote_sock = \ self._create_remote_socket(self._chosen_server[0], @@ -231,7 +265,7 @@ class TCPRelayHandler(object): traceback.print_exc() self.destroy() - def _handle_stage_hello(self, data): + def _handle_stage_addr(self, data): try: if self._is_local: cmd = common.ord(data[1]) @@ -312,7 +346,7 @@ class TCPRelayHandler(object): ip = result[1] if ip: try: - self._stage = STAGE_REPLY + self._stage = STAGE_CONNECTING remote_addr = ip if self._is_local: remote_port = self._chosen_server[1] @@ -320,11 +354,15 @@ class TCPRelayHandler(object): remote_port = self._remote_address[1] if self._is_local and self._config['fast_open']: + # for fastopen: # wait for more data to arrive and send them in one SYN - self._stage = STAGE_REPLY + self._stage = STAGE_CONNECTING + # we don't have to wait for remote since it's not + # created self._update_stream(STREAM_UP, WAIT_STATUS_READING) # TODO when there is already data in this packet else: + # else do connect remote_sock = self._create_remote_socket(remote_addr, remote_port) try: @@ -335,7 +373,7 @@ class TCPRelayHandler(object): pass self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT) - self._stage = STAGE_REPLY + self._stage = STAGE_CONNECTING self._update_stream(STREAM_UP, WAIT_STATUS_READWRITING) self._update_stream(STREAM_DOWN, WAIT_STATUS_READING) return @@ -346,6 +384,8 @@ class TCPRelayHandler(object): self.destroy() def _on_local_read(self): + # handle all local read events and dispatch them to methods for + # each stage self._update_activity() if not self._local_sock: return @@ -372,15 +412,16 @@ class TCPRelayHandler(object): elif is_local and self._stage == STAGE_INIT: # TODO check auth method self._write_to_sock(b'\x05\00', self._local_sock) - self._stage = STAGE_HELLO + self._stage = STAGE_ADDR return - elif self._stage == STAGE_REPLY: - self._handle_stage_reply(data) - elif (is_local and self._stage == STAGE_HELLO) or \ + elif self._stage == STAGE_CONNECTING: + self._handle_stage_connecting(data) + elif (is_local and self._stage == STAGE_ADDR) or \ (not is_local and self._stage == STAGE_INIT): - self._handle_stage_hello(data) + self._handle_stage_addr(data) def _on_remote_read(self): + # handle all remote read events self._update_activity() data = None try: @@ -406,6 +447,7 @@ class TCPRelayHandler(object): self.destroy() def _on_local_write(self): + # handle local writable event if self._data_to_write_to_local: data = b''.join(self._data_to_write_to_local) self._data_to_write_to_local = [] @@ -414,6 +456,7 @@ class TCPRelayHandler(object): self._update_stream(STREAM_DOWN, WAIT_STATUS_READING) def _on_remote_write(self): + # handle remote writable event self._stage = STAGE_STREAM if self._data_to_write_to_remote: data = b''.join(self._data_to_write_to_remote) @@ -435,6 +478,7 @@ class TCPRelayHandler(object): self.destroy() def handle_event(self, sock, event): + # handle all events in this handler and dispatch them to methods if self._stage == STAGE_DESTROYED: logging.debug('ignore handle_event: destroyed') return @@ -465,7 +509,15 @@ class TCPRelayHandler(object): logging.warn('unknown socket') def destroy(self): + # destroy the handler and release any resources + # promises: + # 1. destroy won't make another destroy() call inside + # 2. destroy releases resources so it prevents future call to destroy + # 3. destroy won't raise any exceptions + # if any of the promises are broken, it indicates a bug have been + # introduced! mostly likely memory leaks, etc if self._stage == STAGE_DESTROYED: + # this couldn't happen logging.debug('already destroyed') return self._stage = STAGE_DESTROYED @@ -552,7 +604,7 @@ class TCPRelay(object): del self._handler_to_timeouts[hash(handler)] def update_activity(self, handler): - """ set handler to active """ + # set handler to active now = int(time.time()) if now - handler.last_activity < TIMEOUT_PRECISION: # thus we can lower timeout modification frequency @@ -601,6 +653,7 @@ class TCPRelay(object): self._timeout_offset = pos def _handle_events(self, events): + # handle events and dispatch to handlers for sock, fd, event in events: if sock: logging.log(utils.VERBOSE_LEVEL, 'fd %d %s', fd,