Browse Source

improve comments

auth
clowwindy 10 years ago
parent
commit
4b0b252953
  1. 101
      shadowsocks/tcprelay.py

101
shadowsocks/tcprelay.py

@ -35,35 +35,57 @@ import random
from shadowsocks import encrypt, eventloop, utils, common
from shadowsocks.common import parse_header
# we clear at most TIMEOUTS_CLEAN_SIZE timeouts each time
TIMEOUTS_CLEAN_SIZE = 512
# we check timeouts every TIMEOUT_PRECISION seconds
TIMEOUT_PRECISION = 4
MSG_FASTOPEN = 0x20000000
# SOCKS CMD defination
CMD_CONNECT = 1
CMD_BIND = 2
CMD_UDP_ASSOCIATE = 3
# TCP Relay can be either sslocal or ssserver
# for sslocal it is called is_local=True
# for each opening port, we have a TCP Relay
# for each connection, we have a TCP Relay Handler to handle the connection
# for each handler, we have 2 sockets:
# local: connected to the client
# remote: connected to remote server
# for each handler, we have 2 streams:
# upstream: from client to server direction
# read local and write to remote
# downstream: from server to client direction
# read remote and write to local
# for each handler, it could be at one of several stages:
# local:
# stage 0 init
# stage 1 hello received, hello sent
# stage 0 SOCKS hello received from local, send hello to local
# stage 1 addr received from local, query DNS for remote
# stage 2 UDP assoc
# stage 3 DNS
# stage 4 addr received, reply sent
# stage 5 remote connected
# stage 3 DNS resolved, connect to remote
# stage 4 still connecting, more data from local received
# stage 5 remote connected, piping local and remote
# remote:
# stage 0 init
# stage 3 DNS
# stage 4 addr received, reply sent
# stage 5 remote connected
# stage 0 just jump to stage 1
# stage 1 addr received from local, query DNS for remote
# stage 3 DNS resolved, connect to remote
# stage 4 still connecting, more data from local received
# stage 5 remote connected, piping local and remote
STAGE_INIT = 0
STAGE_HELLO = 1
STAGE_ADDR = 1
STAGE_UDP_ASSOC = 2
STAGE_DNS = 3
STAGE_REPLY = 4
STAGE_CONNECTING = 4
STAGE_STREAM = 5
STAGE_DESTROYED = -1
@ -71,7 +93,7 @@ STAGE_DESTROYED = -1
STREAM_UP = 0
STREAM_DOWN = 1
# stream wait status
# stream wait status, indicating it's waiting for reading, etc
WAIT_STATUS_INIT = 0
WAIT_STATUS_READING = 1
WAIT_STATUS_WRITING = 2
@ -128,9 +150,15 @@ class TCPRelayHandler(object):
return server, server_port
def _update_activity(self):
# tell the TCP Relay we have activities recently
# else it will think we are inactive and timed out
self._server.update_activity(self)
def _update_stream(self, stream, status):
# update a stream to a new waiting status
# check if status is changed
# only update if dirty
dirty = False
if stream == STREAM_DOWN:
if self._downstream_status != status:
@ -157,6 +185,9 @@ class TCPRelayHandler(object):
self._loop.modify(self._remote_sock, event)
def _write_to_sock(self, data, sock):
# write data to sock
# if only some of the data are written, put remaining in the buffer
# and update the stream to wait for writing
if not data or not sock:
return False
uncomplete = False
@ -195,13 +226,16 @@ class TCPRelayHandler(object):
logging.error('write_all_to_sock:unknown socket')
return True
def _handle_stage_reply(self, data):
def _handle_stage_connecting(self, data):
if self._is_local:
data = self._encryptor.encrypt(data)
self._data_to_write_to_remote.append(data)
if self._is_local and not self._fastopen_connected and \
self._config['fast_open']:
# for sslocal and fastopen, we basically wait for data and use
# sendto to connect
try:
# only connect once
self._fastopen_connected = True
remote_sock = \
self._create_remote_socket(self._chosen_server[0],
@ -231,7 +265,7 @@ class TCPRelayHandler(object):
traceback.print_exc()
self.destroy()
def _handle_stage_hello(self, data):
def _handle_stage_addr(self, data):
try:
if self._is_local:
cmd = common.ord(data[1])
@ -312,7 +346,7 @@ class TCPRelayHandler(object):
ip = result[1]
if ip:
try:
self._stage = STAGE_REPLY
self._stage = STAGE_CONNECTING
remote_addr = ip
if self._is_local:
remote_port = self._chosen_server[1]
@ -320,11 +354,15 @@ class TCPRelayHandler(object):
remote_port = self._remote_address[1]
if self._is_local and self._config['fast_open']:
# for fastopen:
# wait for more data to arrive and send them in one SYN
self._stage = STAGE_REPLY
self._stage = STAGE_CONNECTING
# we don't have to wait for remote since it's not
# created
self._update_stream(STREAM_UP, WAIT_STATUS_READING)
# TODO when there is already data in this packet
else:
# else do connect
remote_sock = self._create_remote_socket(remote_addr,
remote_port)
try:
@ -335,7 +373,7 @@ class TCPRelayHandler(object):
pass
self._loop.add(remote_sock,
eventloop.POLL_ERR | eventloop.POLL_OUT)
self._stage = STAGE_REPLY
self._stage = STAGE_CONNECTING
self._update_stream(STREAM_UP, WAIT_STATUS_READWRITING)
self._update_stream(STREAM_DOWN, WAIT_STATUS_READING)
return
@ -346,6 +384,8 @@ class TCPRelayHandler(object):
self.destroy()
def _on_local_read(self):
# handle all local read events and dispatch them to methods for
# each stage
self._update_activity()
if not self._local_sock:
return
@ -372,15 +412,16 @@ class TCPRelayHandler(object):
elif is_local and self._stage == STAGE_INIT:
# TODO check auth method
self._write_to_sock(b'\x05\00', self._local_sock)
self._stage = STAGE_HELLO
self._stage = STAGE_ADDR
return
elif self._stage == STAGE_REPLY:
self._handle_stage_reply(data)
elif (is_local and self._stage == STAGE_HELLO) or \
elif self._stage == STAGE_CONNECTING:
self._handle_stage_connecting(data)
elif (is_local and self._stage == STAGE_ADDR) or \
(not is_local and self._stage == STAGE_INIT):
self._handle_stage_hello(data)
self._handle_stage_addr(data)
def _on_remote_read(self):
# handle all remote read events
self._update_activity()
data = None
try:
@ -406,6 +447,7 @@ class TCPRelayHandler(object):
self.destroy()
def _on_local_write(self):
# handle local writable event
if self._data_to_write_to_local:
data = b''.join(self._data_to_write_to_local)
self._data_to_write_to_local = []
@ -414,6 +456,7 @@ class TCPRelayHandler(object):
self._update_stream(STREAM_DOWN, WAIT_STATUS_READING)
def _on_remote_write(self):
# handle remote writable event
self._stage = STAGE_STREAM
if self._data_to_write_to_remote:
data = b''.join(self._data_to_write_to_remote)
@ -435,6 +478,7 @@ class TCPRelayHandler(object):
self.destroy()
def handle_event(self, sock, event):
# handle all events in this handler and dispatch them to methods
if self._stage == STAGE_DESTROYED:
logging.debug('ignore handle_event: destroyed')
return
@ -465,7 +509,15 @@ class TCPRelayHandler(object):
logging.warn('unknown socket')
def destroy(self):
# destroy the handler and release any resources
# promises:
# 1. destroy won't make another destroy() call inside
# 2. destroy releases resources so it prevents future call to destroy
# 3. destroy won't raise any exceptions
# if any of the promises are broken, it indicates a bug have been
# introduced! mostly likely memory leaks, etc
if self._stage == STAGE_DESTROYED:
# this couldn't happen
logging.debug('already destroyed')
return
self._stage = STAGE_DESTROYED
@ -552,7 +604,7 @@ class TCPRelay(object):
del self._handler_to_timeouts[hash(handler)]
def update_activity(self, handler):
""" set handler to active """
# set handler to active
now = int(time.time())
if now - handler.last_activity < TIMEOUT_PRECISION:
# thus we can lower timeout modification frequency
@ -601,6 +653,7 @@ class TCPRelay(object):
self._timeout_offset = pos
def _handle_events(self, events):
# handle events and dispatch to handlers
for sock, fd, event in events:
if sock:
logging.log(utils.VERBOSE_LEVEL, 'fd %d %s', fd,

Loading…
Cancel
Save