|
|
@ -115,6 +115,7 @@ class TCPRelayHandler(object): |
|
|
|
self._fastopen_connected = False |
|
|
|
self._data_to_write_to_local = [] |
|
|
|
self._data_to_write_to_remote = [] |
|
|
|
self._udp_data_send_buffer = '' |
|
|
|
self._upstream_status = WAIT_STATUS_READING |
|
|
|
self._downstream_status = WAIT_STATUS_INIT |
|
|
|
self._client_address = local_sock.getpeername()[:2] |
|
|
@ -128,7 +129,8 @@ class TCPRelayHandler(object): |
|
|
|
fd_to_handlers[local_sock.fileno()] = self |
|
|
|
local_sock.setblocking(False) |
|
|
|
local_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) |
|
|
|
loop.add(local_sock, eventloop.POLL_IN | eventloop.POLL_ERR) |
|
|
|
loop.add(local_sock, eventloop.POLL_IN | eventloop.POLL_ERR, |
|
|
|
self._server) |
|
|
|
self.last_activity = 0 |
|
|
|
self._update_activity() |
|
|
|
|
|
|
@ -185,6 +187,8 @@ class TCPRelayHandler(object): |
|
|
|
if self._upstream_status & WAIT_STATUS_WRITING: |
|
|
|
event |= eventloop.POLL_OUT |
|
|
|
self._loop.modify(self._remote_sock, event) |
|
|
|
if self._remote_sock_v6: |
|
|
|
self._loop.modify(self._remote_sock_v6, event) |
|
|
|
|
|
|
|
def _write_to_sock(self, data, sock): |
|
|
|
# write data to sock |
|
|
@ -193,51 +197,70 @@ class TCPRelayHandler(object): |
|
|
|
if not data or not sock: |
|
|
|
return False |
|
|
|
#logging.debug("_write_to_sock %s %s %s" % (self._remote_sock, sock, self._remote_udp)) |
|
|
|
if self._remote_udp and self._remote_sock == sock: |
|
|
|
uncomplete = False |
|
|
|
if self._remote_udp and sock == self._remote_sock: |
|
|
|
try: |
|
|
|
frag = common.ord(data[2]) |
|
|
|
if frag != 0: |
|
|
|
logging.warn('drop a message since frag is %d' % (frag,)) |
|
|
|
return False |
|
|
|
else: |
|
|
|
data = data[3:] |
|
|
|
header_result = parse_header(data) |
|
|
|
if header_result is None: |
|
|
|
return False |
|
|
|
connecttype, dest_addr, dest_port, header_length = header_result |
|
|
|
addrs = socket.getaddrinfo(dest_addr, dest_port, 0, |
|
|
|
socket.SOCK_DGRAM, socket.SOL_UDP) |
|
|
|
if addrs: |
|
|
|
af, socktype, proto, canonname, server_addr = addrs[0] |
|
|
|
data = data[header_length:] |
|
|
|
if af == socket.AF_INET6: |
|
|
|
self._remote_sock_v6.sendto(data, (server_addr[0], dest_port)) |
|
|
|
self._udp_data_send_buffer += data |
|
|
|
#logging.info('UDP over TCP sendto %d %s' % (len(data), binascii.hexlify(data))) |
|
|
|
while len(self._udp_data_send_buffer) > 6: |
|
|
|
length = struct.unpack('>H', self._udp_data_send_buffer[:2])[0] |
|
|
|
|
|
|
|
if length > len(self._udp_data_send_buffer): |
|
|
|
break |
|
|
|
|
|
|
|
data = self._udp_data_send_buffer[:length] |
|
|
|
self._udp_data_send_buffer = self._udp_data_send_buffer[length:] |
|
|
|
|
|
|
|
frag = common.ord(data[2]) |
|
|
|
if frag != 0: |
|
|
|
logging.warn('drop a message since frag is %d' % (frag,)) |
|
|
|
continue |
|
|
|
else: |
|
|
|
sock.sendto(data, (server_addr[0], dest_port)) |
|
|
|
data = data[3:] |
|
|
|
header_result = parse_header(data) |
|
|
|
if header_result is None: |
|
|
|
continue |
|
|
|
connecttype, dest_addr, dest_port, header_length = header_result |
|
|
|
addrs = socket.getaddrinfo(dest_addr, dest_port, 0, |
|
|
|
socket.SOCK_DGRAM, socket.SOL_UDP) |
|
|
|
#logging.info('UDP over TCP sendto %s:%d %d bytes from %s:%d' % (dest_addr, dest_port, len(data), self._client_address[0], self._client_address[1])) |
|
|
|
if addrs: |
|
|
|
af, socktype, proto, canonname, server_addr = addrs[0] |
|
|
|
data = data[header_length:] |
|
|
|
if af == socket.AF_INET6: |
|
|
|
self._remote_sock_v6.sendto(data, (server_addr[0], dest_port)) |
|
|
|
else: |
|
|
|
sock.sendto(data, (server_addr[0], dest_port)) |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
#trace = traceback.format_exc() |
|
|
|
#logging.error(trace) |
|
|
|
logging.error(e) |
|
|
|
error_no = eventloop.errno_from_exception(e) |
|
|
|
if error_no in (errno.EAGAIN, errno.EINPROGRESS, |
|
|
|
errno.EWOULDBLOCK): |
|
|
|
uncomplete = True |
|
|
|
else: |
|
|
|
shell.print_exception(e) |
|
|
|
self.destroy() |
|
|
|
return False |
|
|
|
return True |
|
|
|
|
|
|
|
uncomplete = False |
|
|
|
try: |
|
|
|
l = len(data) |
|
|
|
s = sock.send(data) |
|
|
|
if s < l: |
|
|
|
data = data[s:] |
|
|
|
uncomplete = True |
|
|
|
except (OSError, IOError) as e: |
|
|
|
error_no = eventloop.errno_from_exception(e) |
|
|
|
if error_no in (errno.EAGAIN, errno.EINPROGRESS, |
|
|
|
errno.EWOULDBLOCK): |
|
|
|
uncomplete = True |
|
|
|
else: |
|
|
|
#traceback.print_exc() |
|
|
|
shell.print_exception(e) |
|
|
|
self.destroy() |
|
|
|
return False |
|
|
|
else: |
|
|
|
try: |
|
|
|
l = len(data) |
|
|
|
s = sock.send(data) |
|
|
|
if s < l: |
|
|
|
data = data[s:] |
|
|
|
uncomplete = True |
|
|
|
except (OSError, IOError) as e: |
|
|
|
error_no = eventloop.errno_from_exception(e) |
|
|
|
if error_no in (errno.EAGAIN, errno.EINPROGRESS, |
|
|
|
errno.EWOULDBLOCK): |
|
|
|
uncomplete = True |
|
|
|
else: |
|
|
|
#traceback.print_exc() |
|
|
|
shell.print_exception(e) |
|
|
|
self.destroy() |
|
|
|
return False |
|
|
|
if uncomplete: |
|
|
|
if sock == self._local_sock: |
|
|
|
self._data_to_write_to_local.append(data) |
|
|
@ -270,7 +293,7 @@ class TCPRelayHandler(object): |
|
|
|
remote_sock = \ |
|
|
|
self._create_remote_socket(self._chosen_server[0], |
|
|
|
self._chosen_server[1]) |
|
|
|
self._loop.add(remote_sock, eventloop.POLL_ERR) |
|
|
|
self._loop.add(remote_sock, eventloop.POLL_ERR, self._server) |
|
|
|
data = b''.join(self._data_to_write_to_remote) |
|
|
|
l = len(data) |
|
|
|
s = remote_sock.sendto(data, MSG_FASTOPEN, self._chosen_server) |
|
|
@ -382,6 +405,11 @@ class TCPRelayHandler(object): |
|
|
|
remote_sock_v6 = socket.socket(af, socktype, proto) |
|
|
|
self._remote_sock_v6 = remote_sock_v6 |
|
|
|
self._fd_to_handlers[remote_sock_v6.fileno()] = self |
|
|
|
remote_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 32) |
|
|
|
remote_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 32) |
|
|
|
remote_sock_v6.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 32) |
|
|
|
remote_sock_v6.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 32) |
|
|
|
|
|
|
|
|
|
|
|
remote_sock.setblocking(False) |
|
|
|
if self._remote_udp: |
|
|
@ -421,10 +449,12 @@ class TCPRelayHandler(object): |
|
|
|
remote_port) |
|
|
|
if self._remote_udp: |
|
|
|
self._loop.add(remote_sock, |
|
|
|
eventloop.POLL_IN) |
|
|
|
eventloop.POLL_IN, |
|
|
|
self._server) |
|
|
|
if self._remote_sock_v6: |
|
|
|
self._loop.add(self._remote_sock_v6, |
|
|
|
eventloop.POLL_IN) |
|
|
|
eventloop.POLL_IN, |
|
|
|
self._server) |
|
|
|
else: |
|
|
|
try: |
|
|
|
remote_sock.connect((remote_addr, remote_port)) |
|
|
@ -433,10 +463,16 @@ class TCPRelayHandler(object): |
|
|
|
errno.EINPROGRESS: |
|
|
|
pass |
|
|
|
self._loop.add(remote_sock, |
|
|
|
eventloop.POLL_ERR | eventloop.POLL_OUT) |
|
|
|
eventloop.POLL_ERR | eventloop.POLL_OUT, |
|
|
|
self._server) |
|
|
|
self._stage = STAGE_CONNECTING |
|
|
|
self._update_stream(STREAM_UP, WAIT_STATUS_READWRITING) |
|
|
|
self._update_stream(STREAM_DOWN, WAIT_STATUS_READING) |
|
|
|
if self._remote_udp: |
|
|
|
while self._data_to_write_to_remote: |
|
|
|
data = self._data_to_write_to_remote[0] |
|
|
|
del self._data_to_write_to_remote[0] |
|
|
|
self._write_to_sock(data, self._remote_sock) |
|
|
|
return |
|
|
|
except Exception as e: |
|
|
|
shell.print_exception(e) |
|
|
@ -495,11 +531,12 @@ class TCPRelayHandler(object): |
|
|
|
port = struct.pack('>H', addr[1]) |
|
|
|
try: |
|
|
|
ip = socket.inet_aton(addr[0]) |
|
|
|
data = '\x00\x00\x00\x01' + ip + port + data |
|
|
|
data = '\x00\x01' + ip + port + data |
|
|
|
except Exception as e: |
|
|
|
ip = socket.inet_pton(socket.AF_INET6, addr[0]) |
|
|
|
data = '\x00\x00\x00\x04' + ip + port + data |
|
|
|
logging.info('UDP recvfrom %s:%d %d bytes to %s:%d' % (addr[0], addr[1], len(data), self._client_address[0], self._client_address[1])) |
|
|
|
data = '\x00\x04' + ip + port + data |
|
|
|
data = struct.pack('>H', len(data) + 2) + data |
|
|
|
#logging.info('UDP over TCP recvfrom %s:%d %d bytes to %s:%d' % (addr[0], addr[1], len(data), self._client_address[0], self._client_address[1])) |
|
|
|
else: |
|
|
|
data = self._remote_sock.recv(BUF_SIZE) |
|
|
|
except (OSError, IOError) as e: |
|
|
@ -637,7 +674,6 @@ class TCPRelay(object): |
|
|
|
self._closed = False |
|
|
|
self._eventloop = None |
|
|
|
self._fd_to_handlers = {} |
|
|
|
self._last_time = time.time() |
|
|
|
self.server_transfer_ul = 0L |
|
|
|
self.server_transfer_dl = 0L |
|
|
|
|
|
|
@ -680,10 +716,9 @@ class TCPRelay(object): |
|
|
|
if self._closed: |
|
|
|
raise Exception('already closed') |
|
|
|
self._eventloop = loop |
|
|
|
loop.add_handler(self._handle_events) |
|
|
|
|
|
|
|
self._eventloop.add(self._server_socket, |
|
|
|
eventloop.POLL_IN | eventloop.POLL_ERR) |
|
|
|
eventloop.POLL_IN | eventloop.POLL_ERR, self) |
|
|
|
self._eventloop.add_periodic(self.handle_periodic) |
|
|
|
|
|
|
|
def remove_handler(self, handler): |
|
|
|
index = self._handler_to_timeouts.get(hash(handler), -1) |
|
|
@ -695,7 +730,7 @@ class TCPRelay(object): |
|
|
|
def update_activity(self, handler): |
|
|
|
# set handler to active |
|
|
|
now = int(time.time()) |
|
|
|
if now - handler.last_activity < TIMEOUT_PRECISION: |
|
|
|
if now - handler.last_activity < eventloop.TIMEOUT_PRECISION: |
|
|
|
# thus we can lower timeout modification frequency |
|
|
|
return |
|
|
|
handler.last_activity = now |
|
|
@ -741,53 +776,55 @@ class TCPRelay(object): |
|
|
|
pos = 0 |
|
|
|
self._timeout_offset = pos |
|
|
|
|
|
|
|
def _handle_events(self, events): |
|
|
|
def handle_event(self, sock, fd, event): |
|
|
|
# handle events and dispatch to handlers |
|
|
|
for sock, fd, event in events: |
|
|
|
if sock: |
|
|
|
logging.log(shell.VERBOSE_LEVEL, 'fd %d %s', fd, |
|
|
|
eventloop.EVENT_NAMES.get(event, event)) |
|
|
|
if sock == self._server_socket: |
|
|
|
if event & eventloop.POLL_ERR: |
|
|
|
# TODO |
|
|
|
raise Exception('server_socket error') |
|
|
|
try: |
|
|
|
logging.debug('accept') |
|
|
|
conn = self._server_socket.accept() |
|
|
|
TCPRelayHandler(self, self._fd_to_handlers, |
|
|
|
self._eventloop, conn[0], self._config, |
|
|
|
self._dns_resolver, self._is_local) |
|
|
|
except (OSError, IOError) as e: |
|
|
|
error_no = eventloop.errno_from_exception(e) |
|
|
|
if error_no in (errno.EAGAIN, errno.EINPROGRESS, |
|
|
|
errno.EWOULDBLOCK): |
|
|
|
return |
|
|
|
else: |
|
|
|
shell.print_exception(e) |
|
|
|
if self._config['verbose']: |
|
|
|
traceback.print_exc() |
|
|
|
else: |
|
|
|
if sock: |
|
|
|
logging.log(shell.VERBOSE_LEVEL, 'fd %d %s', fd, |
|
|
|
eventloop.EVENT_NAMES.get(event, event)) |
|
|
|
if sock == self._server_socket: |
|
|
|
if event & eventloop.POLL_ERR: |
|
|
|
# TODO |
|
|
|
raise Exception('server_socket error') |
|
|
|
try: |
|
|
|
logging.debug('accept') |
|
|
|
conn = self._server_socket.accept() |
|
|
|
TCPRelayHandler(self, self._fd_to_handlers, |
|
|
|
self._eventloop, conn[0], self._config, |
|
|
|
self._dns_resolver, self._is_local) |
|
|
|
except (OSError, IOError) as e: |
|
|
|
error_no = eventloop.errno_from_exception(e) |
|
|
|
if error_no in (errno.EAGAIN, errno.EINPROGRESS, |
|
|
|
errno.EWOULDBLOCK): |
|
|
|
continue |
|
|
|
else: |
|
|
|
shell.print_exception(e) |
|
|
|
if self._config['verbose']: |
|
|
|
traceback.print_exc() |
|
|
|
handler = self._fd_to_handlers.get(fd, None) |
|
|
|
if handler: |
|
|
|
handler.handle_event(sock, event) |
|
|
|
else: |
|
|
|
if sock: |
|
|
|
handler = self._fd_to_handlers.get(fd, None) |
|
|
|
if handler: |
|
|
|
handler.handle_event(sock, event) |
|
|
|
else: |
|
|
|
logging.warn('poll removed fd') |
|
|
|
logging.warn('poll removed fd') |
|
|
|
|
|
|
|
now = time.time() |
|
|
|
if now - self._last_time > TIMEOUT_PRECISION: |
|
|
|
self._sweep_timeout() |
|
|
|
self._last_time = now |
|
|
|
def handle_periodic(self): |
|
|
|
if self._closed: |
|
|
|
if self._server_socket: |
|
|
|
self._eventloop.remove(self._server_socket) |
|
|
|
self._server_socket.close() |
|
|
|
self._server_socket = None |
|
|
|
logging.info('closed listen port %d', self._listen_port) |
|
|
|
logging.info('closed TCP port %d', self._listen_port) |
|
|
|
if not self._fd_to_handlers: |
|
|
|
self._eventloop.remove_handler(self._handle_events) |
|
|
|
logging.info('stopping') |
|
|
|
self._eventloop.stop() |
|
|
|
self._sweep_timeout() |
|
|
|
|
|
|
|
def close(self, next_tick=False): |
|
|
|
logging.debug('TCP close') |
|
|
|
self._closed = True |
|
|
|
if not next_tick: |
|
|
|
if self._eventloop: |
|
|
|
self._eventloop.remove_periodic(self.handle_periodic) |
|
|
|
self._eventloop.remove(self._server_socket) |
|
|
|
self._server_socket.close() |
|
|
|