Python port of ShadowsocksR
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

278 lines
10 KiB

11 years ago
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2014 clowwindy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# SOCKS5 UDP Request
# +----+------+------+----------+----------+----------+
# |RSV | FRAG | ATYP | DST.ADDR | DST.PORT | DATA |
# +----+------+------+----------+----------+----------+
# | 2 | 1 | 1 | Variable | 2 | Variable |
# +----+------+------+----------+----------+----------+
# SOCKS5 UDP Response
# +----+------+------+----------+----------+----------+
# |RSV | FRAG | ATYP | DST.ADDR | DST.PORT | DATA |
# +----+------+------+----------+----------+----------+
# | 2 | 1 | 1 | Variable | 2 | Variable |
# +----+------+------+----------+----------+----------+
# shadowsocks UDP Request (before encrypted)
# +------+----------+----------+----------+
# | ATYP | DST.ADDR | DST.PORT | DATA |
# +------+----------+----------+----------+
# | 1 | Variable | 2 | Variable |
# +------+----------+----------+----------+
# shadowsocks UDP Response (before encrypted)
# +------+----------+----------+----------+
# | ATYP | DST.ADDR | DST.PORT | DATA |
# +------+----------+----------+----------+
# | 1 | Variable | 2 | Variable |
# +------+----------+----------+----------+
# shadowsocks UDP Request and Response (after encrypted)
# +-------+--------------+
# | IV | PAYLOAD |
# +-------+--------------+
# | Fixed | Variable |
# +-------+--------------+
# HOW TO NAME THINGS
# ------------------
# `dest` means destination server, which is from DST fields in the SOCKS5
# request
# `local` means local server of shadowsocks
# `remote` means remote server of shadowsocks
# `client` means UDP clients that connects to other servers
11 years ago
# `server` means the UDP server that handles user requests
11 years ago
from __future__ import absolute_import, division, print_function, \
with_statement
11 years ago
11 years ago
import time
11 years ago
import socket
11 years ago
import logging
import struct
10 years ago
import errno
import random
from shadowsocks import encrypt, eventloop, lru_cache, common
from shadowsocks.common import parse_header, pack_addr
11 years ago
11 years ago
BUF_SIZE = 65536
def client_key(a, b, c, d):
return '%s:%s:%s:%s' % (a, b, c, d)
11 years ago
class UDPRelay(object):
10 years ago
def __init__(self, config, dns_resolver, is_local):
self._config = config
10 years ago
if is_local:
self._listen_addr = config['local_address']
self._listen_port = config['local_port']
self._remote_addr = config['server']
self._remote_port = config['server_port']
else:
self._listen_addr = config['server']
self._listen_port = config['server_port']
self._remote_addr = None
self._remote_port = None
10 years ago
self._dns_resolver = dns_resolver
10 years ago
self._password = config['password']
self._method = config['method']
self._timeout = config['timeout']
11 years ago
self._is_local = is_local
10 years ago
self._cache = lru_cache.LRUCache(timeout=config['timeout'],
11 years ago
close_callback=self._close_client)
10 years ago
self._client_fd_to_server_addr = \
lru_cache.LRUCache(timeout=config['timeout'])
10 years ago
self._eventloop = None
11 years ago
self._closed = False
10 years ago
self._last_time = time.time()
self._sockets = set()
11 years ago
11 years ago
addrs = socket.getaddrinfo(self._listen_addr, self._listen_port, 0,
socket.SOCK_DGRAM, socket.SOL_UDP)
if len(addrs) == 0:
raise Exception("can't get addrinfo for %s:%d" %
(self._listen_addr, self._listen_port))
af, socktype, proto, canonname, sa = addrs[0]
server_socket = socket.socket(af, socktype, proto)
server_socket.bind((self._listen_addr, self._listen_port))
server_socket.setblocking(False)
self._server_socket = server_socket
def _get_a_server(self):
server = self._config['server']
server_port = self._config['server_port']
if type(server_port) == list:
server_port = random.choice(server_port)
logging.debug('chosen server: %s:%d', server, server_port)
# TODO support multiple server IP
return server, server_port
11 years ago
def _close_client(self, client):
if hasattr(client, 'close'):
10 years ago
self._sockets.remove(client.fileno())
11 years ago
self._eventloop.remove(client)
client.close()
else:
# just an address
pass
11 years ago
def _handle_server(self):
server = self._server_socket
data, r_addr = server.recvfrom(BUF_SIZE)
10 years ago
if not data:
logging.debug('UDP handle_server: data is empty')
11 years ago
if self._is_local:
frag = common.ord(data[2])
11 years ago
if frag != 0:
logging.warn('drop a message since frag is not 0')
return
11 years ago
else:
data = data[3:]
else:
data = encrypt.encrypt_all(self._password, self._method, 0, data)
10 years ago
# decrypt data
if not data:
10 years ago
logging.debug('UDP handle_server: data is empty after decrypt')
return
header_result = parse_header(data)
if header_result is None:
return
addrtype, dest_addr, dest_port, header_length = header_result
if self._is_local:
server_addr, server_port = self._get_a_server()
else:
server_addr, server_port = dest_addr, dest_port
key = client_key(r_addr[0], r_addr[1], dest_addr, dest_port)
client = self._cache.get(key, None)
if not client:
# TODO async getaddrinfo
addrs = socket.getaddrinfo(server_addr, server_port, 0,
socket.SOCK_DGRAM, socket.SOL_UDP)
if addrs:
af, socktype, proto, canonname, sa = addrs[0]
client = socket.socket(af, socktype, proto)
client.setblocking(False)
self._cache[key] = client
self._client_fd_to_server_addr[client.fileno()] = r_addr
else:
# drop
return
10 years ago
self._sockets.add(client.fileno())
11 years ago
self._eventloop.add(client, eventloop.POLL_IN)
11 years ago
if self._is_local:
data = encrypt.encrypt_all(self._password, self._method, 1, data)
if not data:
return
else:
data = data[header_length:]
if not data:
return
11 years ago
try:
client.sendto(data, (server_addr, server_port))
except IOError as e:
err = eventloop.errno_from_exception(e)
if err in (errno.EINPROGRESS, errno.EAGAIN):
pass
else:
logging.error(e)
11 years ago
def _handle_client(self, sock):
data, r_addr = sock.recvfrom(BUF_SIZE)
10 years ago
if not data:
logging.debug('UDP handle_client: data is empty')
return
if not self._is_local:
addrlen = len(r_addr[0])
if addrlen > 255:
# drop
return
data = pack_addr(r_addr[0]) + struct.pack('>H', r_addr[1]) + data
response = encrypt.encrypt_all(self._password, self._method, 1,
data)
if not response:
return
else:
data = encrypt.encrypt_all(self._password, self._method, 0,
data)
if not data:
return
header_result = parse_header(data)
if header_result is None:
return
# addrtype, dest_addr, dest_port, header_length = header_result
10 years ago
response = b'\x00\x00\x00' + data
10 years ago
client_addr = self._client_fd_to_server_addr.get(sock.fileno())
if client_addr:
self._server_socket.sendto(response, client_addr)
else:
11 years ago
# this packet is from somewhere else we know
# simply drop that packet
pass
11 years ago
10 years ago
def add_to_loop(self, loop):
10 years ago
if self._eventloop:
raise Exception('already add to loop')
11 years ago
if self._closed:
10 years ago
raise Exception('already closed')
self._eventloop = loop
loop.add_handler(self._handle_events)
server_socket = self._server_socket
self._eventloop.add(server_socket,
eventloop.POLL_IN | eventloop.POLL_ERR)
def _handle_events(self, events):
for sock, fd, event in events:
if sock == self._server_socket:
10 years ago
if event & eventloop.POLL_ERR:
logging.error('UDP server_socket err')
10 years ago
self._handle_server()
elif sock and (fd in self._sockets):
10 years ago
if event & eventloop.POLL_ERR:
logging.error('UDP client_socket err')
10 years ago
self._handle_client(sock)
now = time.time()
if now - self._last_time > 3:
10 years ago
self._cache.sweep()
self._client_fd_to_server_addr.sweep()
self._last_time = now
if self._closed:
self._server_socket.close()
for sock in self._sockets:
sock.close()
self._eventloop.remove_handler(self._handle_events)
11 years ago
def close(self, next_tick=False):
11 years ago
self._closed = True
if not next_tick:
self._server_socket.close()