#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright (c) 2014 clowwindy # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import time import socket import logging import encrypt import errno import threading import eventloop from common import parse_header CMD_CONNECT = 1 # local: # stage 0 init # stage 1 hello received, hello sent # stage 4 addr received, reply sent # stage 5 remote connected # remote: # stage 0 init # stage 4 addr received, reply sent # stage 5 remote connected STAGE_INIT = 0 STAGE_HELLO = 1 STAGE_REPLY = 4 STAGE_STREAM = 5 # stream direction STREAM_UP = 0 STREAM_DOWN = 1 # stream status STATUS_WAIT_INIT = 0 STATUS_WAIT_READING = 1 STATUS_WAIT_WRITING = 2 STATUS_WAIT_READWRITING = STATUS_WAIT_READING | STATUS_WAIT_WRITING BUF_SIZE = 8 * 1024 class TCPRelayHandler(object): def __init__(self, fd_to_handlers, loop, local_sock, config, is_local): self._fd_to_handlers = fd_to_handlers self._loop = loop self._local_sock = local_sock self._remote_sock = None self._config = config self._is_local = is_local self._stage = STAGE_INIT self._encryptor = encrypt.Encryptor(config['password'], config['method']) self._data_to_write_to_local = [] self._data_to_write_to_remote = [] self._upstream_status = STATUS_WAIT_READING self._downstream_status = STATUS_WAIT_INIT fd_to_handlers[local_sock.fileno()] = self local_sock.setblocking(False) local_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) loop.add(local_sock, eventloop.POLL_IN | eventloop.POLL_ERR) def update_stream(self, stream, status): dirty = False if stream == STREAM_DOWN: if self._downstream_status != status: self._downstream_status = status dirty = True elif stream == STREAM_UP: if self._upstream_status != status: self._upstream_status = status dirty = True if dirty: if self._local_sock: event = eventloop.POLL_ERR if self._downstream_status & STATUS_WAIT_WRITING: event |= eventloop.POLL_OUT if self._upstream_status & STATUS_WAIT_READING: event |= eventloop.POLL_IN self._loop.modify(self._local_sock, event) if self._remote_sock: event = eventloop.POLL_ERR if self._downstream_status & STATUS_WAIT_READING: event |= eventloop.POLL_IN if self._upstream_status & STATUS_WAIT_WRITING: event |= eventloop.POLL_OUT self._loop.modify(self._remote_sock, event) def write_all_to_sock(self, data, sock): if not data or not sock: return uncomplete = False try: l = len(data) s = sock.send(data) if s < l: data = data[s:] uncomplete = True except (OSError, IOError) as e: error_no = eventloop.errno_from_exception(e) if error_no in (errno.EAGAIN, errno.EINPROGRESS): uncomplete = True else: logging.error(e) self.destroy() if uncomplete: if sock == self._local_sock: self._data_to_write_to_local.append(data) self.update_stream(STREAM_DOWN, STATUS_WAIT_WRITING) elif sock == self._remote_sock: self._data_to_write_to_remote.append(data) self.update_stream(STREAM_UP, STATUS_WAIT_WRITING) else: logging.error('write_all_to_sock:unknown socket') else: if sock == self._local_sock: self.update_stream(STREAM_DOWN, STATUS_WAIT_READING) elif sock == self._remote_sock: self.update_stream(STREAM_UP, STATUS_WAIT_READING) else: logging.error('write_all_to_sock:unknown socket') def on_local_read(self): # TODO update timeout if not self._local_sock: return is_local = self._is_local data = None try: data = self._local_sock.recv(BUF_SIZE) except (OSError, IOError) as e: if eventloop.errno_from_exception(e) in \ (errno.ETIMEDOUT, errno.EAGAIN): return if not data: self.destroy() return if not is_local: data = self._encryptor.decrypt(data) if not data: return if self._stage == STAGE_STREAM: if self._is_local: data = self._encryptor.encrypt(data) self.write_all_to_sock(data, self._remote_sock) return elif is_local and self._stage == STAGE_INIT: # TODO check auth method self.write_all_to_sock('\x05\00', self._local_sock) self._stage = STAGE_HELLO return elif self._stage == STAGE_REPLY: if is_local: data = self._encryptor.encrypt(data) self._data_to_write_to_remote.append(data) elif (is_local and self._stage == STAGE_HELLO) or \ (not is_local and self._stage == STAGE_INIT): try: if is_local: cmd = ord(data[1]) # TODO check cmd == 1 assert cmd == CMD_CONNECT # just trim VER CMD RSV data = data[3:] header_result = parse_header(data) if header_result is None: raise Exception('can not parse header') addrtype, remote_addr, remote_port, header_length =\ header_result logging.info('connecting %s:%d' % (remote_addr, remote_port)) if is_local: # forward address to remote self.write_all_to_sock('\x05\x00\x00\x01' + '\x00\x00\x00\x00\x10\x10', self._local_sock) data_to_send = self._encryptor.encrypt(data) self._data_to_write_to_remote.append(data_to_send) remote_addr = self._config['server'] remote_port = self._config['server_port'] else: if len(data) > header_length: self._data_to_write_to_remote.append( data[header_length:]) # TODO async DNS addrs = socket.getaddrinfo(remote_addr, remote_port, 0, socket.SOCK_STREAM, socket.SOL_TCP) if len(addrs) == 0: raise Exception("can't get addrinfo for %s:%d" % (remote_addr, remote_port)) af, socktype, proto, canonname, sa = addrs[0] remote_sock = socket.socket(af, socktype, proto) self._remote_sock = remote_sock self._fd_to_handlers[remote_sock.fileno()] = self remote_sock.setblocking(False) remote_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) # TODO support TCP fast open try: remote_sock.connect(sa) except (OSError, IOError) as e: if eventloop.errno_from_exception(e) == errno.EINPROGRESS: pass self._loop.add(remote_sock, eventloop.POLL_ERR | eventloop.POLL_OUT) self._stage = STAGE_REPLY self.update_stream(STREAM_UP, STATUS_WAIT_READWRITING) self.update_stream(STREAM_DOWN, STATUS_WAIT_READING) return except Exception: import traceback traceback.print_exc() # TODO use logging when debug completed self.destroy() def on_remote_read(self): # TODO update timeout data = None try: data = self._remote_sock.recv(BUF_SIZE) except (OSError, IOError) as e: if eventloop.errno_from_exception(e) in \ (errno.ETIMEDOUT, errno.EAGAIN): return if not data: self.destroy() return if self._is_local: data = self._encryptor.decrypt(data) else: data = self._encryptor.encrypt(data) try: self.write_all_to_sock(data, self._local_sock) except Exception: import traceback traceback.print_exc() # TODO use logging when debug completed self.destroy() def on_local_write(self): if self._data_to_write_to_local: data = ''.join(self._data_to_write_to_local) self._data_to_write_to_local = [] self.write_all_to_sock(data, self._local_sock) else: self.update_stream(STREAM_DOWN, STATUS_WAIT_READING) def on_remote_write(self): self._stage = STAGE_STREAM if self._data_to_write_to_remote: data = ''.join(self._data_to_write_to_remote) self._data_to_write_to_remote = [] self.write_all_to_sock(data, self._remote_sock) else: self.update_stream(STREAM_UP, STATUS_WAIT_READING) def on_local_error(self): logging.error(eventloop.get_sock_error(self._local_sock)) self.destroy() def on_remote_error(self): logging.error(eventloop.get_sock_error(self._remote_sock)) self.destroy() def handle_event(self, sock, event): # order is important if sock == self._remote_sock: if event & eventloop.POLL_IN: self.on_remote_read() if event & eventloop.POLL_OUT: self.on_remote_write() if event & eventloop.POLL_ERR: self.on_remote_error() elif sock == self._local_sock: if event & eventloop.POLL_IN: self.on_local_read() if event & eventloop.POLL_OUT: self.on_local_write() if event & eventloop.POLL_ERR: self.on_local_error() else: logging.warn('unknown socket') def destroy(self): if self._remote_sock: self._loop.remove(self._remote_sock) del self._fd_to_handlers[self._remote_sock.fileno()] self._remote_sock.close() self._remote_sock = None if self._local_sock: self._loop.remove(self._local_sock) del self._fd_to_handlers[self._local_sock.fileno()] self._local_sock.close() self._local_sock = None class TCPRelay(object): def __init__(self, config, is_local): self._config = config self._is_local = is_local self._closed = False self._thread = None self._fd_to_handlers = {} if is_local: listen_addr = config['local_address'] listen_port = config['local_port'] else: listen_addr = config['server'] listen_port = config['server_port'] addrs = socket.getaddrinfo(listen_addr, listen_port, 0, socket.SOCK_STREAM, socket.SOL_TCP) if len(addrs) == 0: raise Exception("can't get addrinfo for %s:%d" % (listen_addr, listen_port)) af, socktype, proto, canonname, sa = addrs[0] server_socket = socket.socket(af, socktype, proto) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(sa) server_socket.setblocking(False) server_socket.listen(1024) self._server_socket = server_socket def _run(self): server_socket = self._server_socket self._eventloop = eventloop.EventLoop() logging.debug('using event model: %s', self._eventloop.model) self._eventloop.add(server_socket, eventloop.POLL_IN | eventloop.POLL_ERR) last_time = time.time() while not self._closed: try: events = self._eventloop.poll(1) except (OSError, IOError) as e: if eventloop.errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection continue else: logging.error(e) continue for sock, event in events: if sock: logging.debug('fd %d %s', sock.fileno(), eventloop.EVENT_NAMES[event]) if sock == self._server_socket: if event & eventloop.POLL_ERR: # TODO raise Exception('server_socket error') try: conn = self._server_socket.accept() TCPRelayHandler(self._fd_to_handlers, self._eventloop, conn[0], self._config, self._is_local) except (OSError, IOError) as e: error_no = eventloop.errno_from_exception(e) if error_no in (errno.EAGAIN, errno.EINPROGRESS): continue else: logging.error(e) else: if sock: handler = self._fd_to_handlers.get(sock.fileno(), None) if handler: handler.handle_event(sock, event) else: logging.warn('can not find handler for fd %d', sock.fileno()) self._eventloop.remove(sock) else: logging.warn('poll removed fd') now = time.time() if now - last_time > 5: # TODO sweep timeouts last_time = now def start(self): # TODO combine loops on multiple ports into one single loop if self._closed: raise Exception('closed') t = threading.Thread(target=self._run) t.setName('TCPThread') t.setDaemon(False) t.start() self._thread = t def close(self): self._closed = True self._server_socket.close() def thread(self): return self._thread