From 2de2a553d7c0533fd279dcf2adc4d782241ba634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A0=B4=E5=A8=83=E9=85=B1?= Date: Wed, 11 Jan 2017 14:45:37 +0800 Subject: [PATCH] multi-user for mudbjson mode --- db_transfer.py | 67 +++++++++++++++++++++++++++------------- mujson_mgr.py | 6 +++- server_pool.py | 68 ++++++++++++++++++++++++++++++++++++----- shadowsocks/tcprelay.py | 51 ++++++++++++++++++++++++------- shadowsocks/udprelay.py | 49 ++++++++++++++++++++++------- 5 files changed, 188 insertions(+), 53 deletions(-) diff --git a/db_transfer.py b/db_transfer.py index 2ef5829..ec9152b 100644 --- a/db_transfer.py +++ b/db_transfer.py @@ -85,6 +85,8 @@ class TransferBase(object): logging.error('load switchrule.py fail') cur_servers = {} new_servers = {} + allow_users = {} + mu_servers = {} for row in rows: try: allow = switchrule.isTurnOn(row) and row['enable'] == 1 and row['u'] + row['d'] < row['transfer_enable'] @@ -113,34 +115,50 @@ class TransferBase(object): logging.error('more than one user use the same port [%s]' % (port,)) continue - if ServerPool.get_instance().server_is_run(port) > 0: - if not allow: - logging.info('db stop server at port [%s]' % (port,)) - ServerPool.get_instance().cb_del_server(port) - self.force_update_transfer.add(port) - else: - cfgchange = False - if port in ServerPool.get_instance().tcp_servers_pool: - relay = ServerPool.get_instance().tcp_servers_pool[port] - for name in merge_config_keys: - if name in cfg and not self.cmp(cfg[name], relay._config[name]): - cfgchange = True - break; - if not cfgchange and port in ServerPool.get_instance().tcp_ipv6_servers_pool: - relay = ServerPool.get_instance().tcp_ipv6_servers_pool[port] - for name in merge_config_keys: - if name in cfg and not self.cmp(cfg[name], relay._config[name]): - cfgchange = True - break; - #config changed + if allow: + allow_users[port] = 1 + if 'protocol' in cfg and 'protocol_param' in cfg and common.to_str(cfg['protocol']) in ['auth_aes128_md5', 'auth_aes128_sha1']: + if '#' in common.to_str(cfg['protocol_param']): + mu_servers[port] = 1 + + cfgchange = False + if port in ServerPool.get_instance().tcp_servers_pool: + relay = ServerPool.get_instance().tcp_servers_pool[port] + for name in merge_config_keys: + if name in cfg and not self.cmp(cfg[name], relay._config[name]): + cfgchange = True + break + if not cfgchange and port in ServerPool.get_instance().tcp_ipv6_servers_pool: + relay = ServerPool.get_instance().tcp_ipv6_servers_pool[port] + for name in merge_config_keys: + if name in cfg and not self.cmp(cfg[name], relay._config[name]): + cfgchange = True + break + + if port in mu_servers: + if ServerPool.get_instance().server_is_run(port) > 0: if cfgchange: logging.info('db stop server at port [%s] reason: config changed: %s' % (port, cfg)) ServerPool.get_instance().cb_del_server(port) self.force_update_transfer.add(port) new_servers[port] = (passwd, cfg) + else: + self.new_server(port, passwd, cfg) + else: + if ServerPool.get_instance().server_is_run(port) > 0: + if not allow: + logging.info('db stop server at port [%s]' % (port,)) + ServerPool.get_instance().cb_del_server(port) + self.force_update_transfer.add(port) + else: + if cfgchange: + logging.info('db stop server at port [%s] reason: config changed: %s' % (port, cfg)) + ServerPool.get_instance().cb_del_server(port) + self.force_update_transfer.add(port) + new_servers[port] = (passwd, cfg) - elif allow and ServerPool.get_instance().server_run_status(port) is False: - self.new_server(port, passwd, cfg) + elif allow and port > 0 and port < 65536 and ServerPool.get_instance().server_run_status(port) is False: + self.new_server(port, passwd, cfg) for row in last_rows: if row['port'] in cur_servers: @@ -159,6 +177,11 @@ class TransferBase(object): passwd, cfg = new_servers[port] self.new_server(port, passwd, cfg) + if isinstance(self, MuJsonTransfer): # works in MuJsonTransfer only + logging.debug('db allow users %s \nmu_servers %s' % (allow_users, mu_servers)) + for port in mu_servers: + ServerPool.get_instance().update_mu_server(port, None, allow_users) + def clear_cache(self, port): if port in self.force_update_transfer: del self.force_update_transfer[port] if port in self.last_get_transfer: del self.last_get_transfer[port] diff --git a/mujson_mgr.py b/mujson_mgr.py index c01b23d..a880a1c 100644 --- a/mujson_mgr.py +++ b/mujson_mgr.py @@ -73,8 +73,12 @@ class MuMgr(object): def userinfo(self, user): ret = "" + key_list = ['user', 'port', 'method', 'passwd', 'protocol', 'protocol_param', 'obfs', 'obfs_param', 'transfer_enable', 'u', 'd'] for key in sorted(user): - if key in ['enable']: + if key not in key_list: + key_list.append(key) + for key in key_list: + if key in ['enable'] or key not in user: continue ret += '\n' if key in ['transfer_enable', 'u', 'd']: diff --git a/server_pool.py b/server_pool.py index a526bd0..45fe6a1 100644 --- a/server_pool.py +++ b/server_pool.py @@ -23,6 +23,7 @@ import os import logging +import struct import time from shadowsocks import shell, eventloop, tcprelay, udprelay, asyncdns, common import threading @@ -213,23 +214,62 @@ class ServerPool(object): return True + def update_mu_server(self, port, protocol_param, acl): + port = int(port) + if port in self.tcp_servers_pool: + try: + self.tcp_servers_pool[port].update_users(protocol_param, acl) + except Exception as e: + logging.warn(e) + try: + self.udp_servers_pool[port].update_users(protocol_param, acl) + except Exception as e: + logging.warn(e) + if port in self.tcp_ipv6_servers_pool: + try: + self.tcp_ipv6_servers_pool[port].update_users(protocol_param, acl) + except Exception as e: + logging.warn(e) + try: + self.udp_ipv6_servers_pool[port].update_users(protocol_param, acl) + except Exception as e: + logging.warn(e) + def get_server_transfer(self, port): port = int(port) + uid = struct.pack('