Browse Source

remove debug code

add AddressMap for udp
dev
breakwa11 10 years ago
parent
commit
e5e3102591
  1. 80
      shadowsocks/udprelay.py

80
shadowsocks/udprelay.py

@ -97,8 +97,9 @@ WAIT_STATUS_READWRITING = WAIT_STATUS_READING | WAIT_STATUS_WRITING
BUF_SIZE = 65536 BUF_SIZE = 65536
DOUBLE_SEND_BEG_IDS = 16 DOUBLE_SEND_BEG_IDS = 16
POST_MTU_MIN = 1000 POST_MTU_MIN = 500
POST_MTU_MAX = 1400 POST_MTU_MAX = 1400
SENDING_WINDOW_SIZE = 8192
STAGE_INIT = 0 STAGE_INIT = 0
STAGE_RSP_ID = 1 STAGE_RSP_ID = 1
@ -173,9 +174,6 @@ class SendingQueue(object):
while self.begin_id < begin_id: while self.begin_id < begin_id:
self.begin_id += 1 self.begin_id += 1
del self.queue[self.begin_id] del self.queue[self.begin_id]
#while len(self.queue) > 0 and self.queue[0][0] <= begin_id:
# del self.queue[0]
# self.begin_id += 1
class RecvQueue(object): class RecvQueue(object):
def __init__(self): def __init__(self):
@ -229,6 +227,38 @@ class RecvQueue(object):
missing.append(i - begin_id) missing.append(i - begin_id)
return (begin_id, missing) return (begin_id, missing)
class AddressMap(object):
def __init__(self):
self._queue = []
self._addr_map = {}
def add(self, addr):
if addr in self._addr_map:
self._addr_map[addr] = UDPLocalAddress(addr)
else:
self._addr_map[addr] = UDPLocalAddress(addr)
self._queue.append(addr)
def keys(self):
return self._queue
def get(self):
if self._queue:
while True:
if len(self._queue) == 1:
return self._queue[0]
index = random.randint(0, len(self._queue) - 1)
addr = self._queue[index]
if self._addr_map[addr].is_timeout():
self._queue[index] = self._queue[len(self._queue) - 1]
del self._queue[len(self._queue) - 1]
del self._addr_map[addr]
else:
break
return addr
else:
return None
class TCPRelayHandler(object): class TCPRelayHandler(object):
def __init__(self, server, reqid_to_handlers, fd_to_handlers, loop, def __init__(self, server, reqid_to_handlers, fd_to_handlers, loop,
local_sock, local_id, client_param, config, local_sock, local_id, client_param, config,
@ -254,7 +284,7 @@ class TCPRelayHandler(object):
self._upstream_status = WAIT_STATUS_READING self._upstream_status = WAIT_STATUS_READING
self._downstream_status = WAIT_STATUS_INIT self._downstream_status = WAIT_STATUS_INIT
self._request_id = 0 self._request_id = 0
self._client_address = {} self._client_address = AddressMap()
self._remote_address = None self._remote_address = None
self._sendingqueue = SendingQueue() self._sendingqueue = SendingQueue()
self._recvqueue = RecvQueue() self._recvqueue = RecvQueue()
@ -282,7 +312,10 @@ class TCPRelayHandler(object):
return self._remote_address return self._remote_address
def add_local_address(self, addr): def add_local_address(self, addr):
self._client_address[addr] = UDPLocalAddress(addr) self._client_address.add(addr)
def get_local_address(self):
return self._client_address.get()
def _update_activity(self): def _update_activity(self):
# tell the TCP Relay we have activities recently # tell the TCP Relay we have activities recently
@ -367,8 +400,6 @@ class TCPRelayHandler(object):
return False return False
if uncomplete: if uncomplete:
if sock == self._local_sock: if sock == self._local_sock:
#if data is not None and retry < 10:
# self._data_to_write_to_local.append([(data, addr), retry])
self._update_stream(STREAM_DOWN, WAIT_STATUS_WRITING) self._update_stream(STREAM_DOWN, WAIT_STATUS_WRITING)
elif sock == self._remote_sock: elif sock == self._remote_sock:
self._data_to_write_to_remote.append(data) self._data_to_write_to_remote.append(data)
@ -377,15 +408,12 @@ class TCPRelayHandler(object):
logging.error('write_all_to_sock:unknown socket') logging.error('write_all_to_sock:unknown socket')
else: else:
if sock == self._local_sock: if sock == self._local_sock:
if self._sendingqueue.size() > 8192: if self._sendingqueue.size() > SENDING_WINDOW_SIZE:
self._update_stream(STREAM_DOWN, WAIT_STATUS_WRITING) self._update_stream(STREAM_DOWN, WAIT_STATUS_WRITING)
else: else:
self._update_stream(STREAM_DOWN, WAIT_STATUS_READING) self._update_stream(STREAM_DOWN, WAIT_STATUS_READING)
elif sock == self._remote_sock: elif sock == self._remote_sock:
if self._sendingqueue.size() > 8192: self._update_stream(STREAM_UP, WAIT_STATUS_READING)
self._update_stream(STREAM_DOWN, WAIT_STATUS_WRITING)
else:
self._update_stream(STREAM_UP, WAIT_STATUS_READING)
else: else:
logging.error('write_all_to_sock:unknown socket') logging.error('write_all_to_sock:unknown socket')
return True return True
@ -439,9 +467,7 @@ class TCPRelayHandler(object):
self._update_stream(STREAM_DOWN, WAIT_STATUS_READING) self._update_stream(STREAM_DOWN, WAIT_STATUS_READING)
self._stage = STAGE_STREAM self._stage = STAGE_STREAM
for it_addr in self._client_address: addr = self.get_local_address()
addr = it_addr
break
for i in xrange(2): for i in xrange(2):
rsp_data = self._pack_rsp_data(CMD_RSP_CONNECT_REMOTE, "\x02") rsp_data = self._pack_rsp_data(CMD_RSP_CONNECT_REMOTE, "\x02")
@ -508,13 +534,11 @@ class TCPRelayHandler(object):
pack_id = self._sendingqueue.append(data) pack_id = self._sendingqueue.append(data)
post_data = self._pack_post_data(CMD_POST, pack_id, data) post_data = self._pack_post_data(CMD_POST, pack_id, data)
for it_addr in self._client_address: addr = self.get_local_address()
addr = it_addr
break
self._write_to_sock(post_data, self._local_sock, addr) self._write_to_sock(post_data, self._local_sock, addr)
#if pack_id <= DOUBLE_SEND_BEG_IDS: if pack_id <= DOUBLE_SEND_BEG_IDS:
# post_data = self._pack_post_data(CMD_POST, pack_id, data) post_data = self._pack_post_data(CMD_POST, pack_id, data)
# self._write_to_sock(post_data, self._local_sock, addr) self._write_to_sock(post_data, self._local_sock, addr)
except Exception as e: except Exception as e:
shell.print_exception(e) shell.print_exception(e)
@ -620,9 +644,9 @@ class TCPRelayHandler(object):
for post_pack_id, post_data in send_list: for post_pack_id, post_data in send_list:
rsp_data = self._pack_post_data(CMD_POST, post_pack_id, post_data) rsp_data = self._pack_post_data(CMD_POST, post_pack_id, post_data)
self._write_to_sock(rsp_data, self._local_sock, addr) self._write_to_sock(rsp_data, self._local_sock, addr)
#if post_pack_id <= DOUBLE_SEND_BEG_IDS: if post_pack_id <= DOUBLE_SEND_BEG_IDS:
# rsp_data = self._pack_post_data(CMD_POST, post_pack_id, post_data) rsp_data = self._pack_post_data(CMD_POST, post_pack_id, post_data)
# self._write_to_sock(rsp_data, self._local_sock, addr) self._write_to_sock(rsp_data, self._local_sock, addr)
def handle_client(self, addr, cmd, request_id, data): def handle_client(self, addr, cmd, request_id, data):
self.add_local_address(addr) self.add_local_address(addr)
@ -732,13 +756,11 @@ class TCPRelayHandler(object):
pack_id = struct.unpack(">I", data[0:4])[0] pack_id = struct.unpack(">I", data[0:4])[0]
max_send_id = struct.unpack(">I", data[4:8])[0] max_send_id = struct.unpack(">I", data[4:8])[0]
data = data[8:] data = data[8:]
logging.info('handle_client STAGE_DESTROYED send %d %d' % (request_id, pack_id))
self.handle_stream_sync_status(addr, cmd, request_id, pack_id, max_send_id, data) self.handle_stream_sync_status(addr, cmd, request_id, pack_id, max_send_id, data)
elif cmd == CMD_SYN_STATUS_64: elif cmd == CMD_SYN_STATUS_64:
pack_id = struct.unpack(">Q", data[0:8])[0] pack_id = struct.unpack(">Q", data[0:8])[0]
max_send_id = struct.unpack(">Q", data[8:16])[0] max_send_id = struct.unpack(">Q", data[8:16])[0]
data = data[16:] data = data[16:]
logging.info('handle_client STAGE_DESTROYED send %d %d' % (request_id, pack_id))
self.handle_stream_sync_status(addr, cmd, request_id, pack_id, max_send_id, data) self.handle_stream_sync_status(addr, cmd, request_id, pack_id, max_send_id, data)
def handle_event(self, sock, event): def handle_event(self, sock, event):
@ -810,9 +832,7 @@ class TCPRelayHandler(object):
logging.debug('disconnect local') logging.debug('disconnect local')
rsp_data = self._pack_rsp_data(CMD_DISCONNECT, "") rsp_data = self._pack_rsp_data(CMD_DISCONNECT, "")
addr = None addr = None
for it_addr in self._client_address: addr = self.get_local_address()
addr = it_addr
break
self._write_to_sock(rsp_data, self._local_sock, addr) self._write_to_sock(rsp_data, self._local_sock, addr)
self._local_sock = None self._local_sock = None
del self._reqid_to_handlers[self._request_id] del self._reqid_to_handlers[self._request_id]

Loading…
Cancel
Save