diff --git a/apiconfig.py b/apiconfig.py index c65577a..8241547 100644 --- a/apiconfig.py +++ b/apiconfig.py @@ -1,10 +1,10 @@ # Config TRANSFER_MUL = 1.0 NODE_ID = 1 -API_INTERFACE = 'sspanelv2' #sspanelv2, sspanelv3, sspanelv3ssr, muapiv2(not support) +API_INTERFACE = 'sspanelv2' #mudbjson, sspanelv2, sspanelv3, sspanelv3ssr, muapiv2(not support) # Mysql -MYSQL_HOST = 'mdss.mengsky.net' +MYSQL_HOST = '127.0.0.1' MYSQL_PORT = 3306 MYSQL_USER = 'ss' MYSQL_PASS = 'ss' @@ -12,7 +12,7 @@ MYSQL_DB = 'shadowsocks' MYSQL_UPDATE_TIME = 60 # API -API_HOST = 'breakwa11.moe' +API_HOST = '127.0.0.1' API_PORT = 80 API_PATH = '/mu/v2/' API_TOKEN = 'abcdef' diff --git a/db_transfer.py b/db_transfer.py index ddb4bcc..12e6e55 100644 --- a/db_transfer.py +++ b/db_transfer.py @@ -6,24 +6,17 @@ import time import sys from server_pool import ServerPool import traceback -from shadowsocks import common +from shadowsocks import common, shell from configloader import load_config, get_config -class DbTransfer(object): - - instance = None +db_instance = None +class DbTransfer(object): def __init__(self): import threading self.last_get_transfer = {} self.event = threading.Event() - @staticmethod - def get_instance(): - if DbTransfer.instance is None: - DbTransfer.instance = DbTransfer() - return DbTransfer.instance - def update_all_user(self, dt_transfer): import cymysql query_head = 'UPDATE user' @@ -83,8 +76,7 @@ class DbTransfer(object): self.update_all_user(dt_transfer) self.last_get_transfer = curr_transfer - @staticmethod - def pull_db_all_user(): + def pull_db_all_user(self): import cymysql #数据库所有用户信息 try: @@ -107,8 +99,7 @@ class DbTransfer(object): conn.close() return rows - @staticmethod - def del_server_out_of_bound_safe(last_rows, rows): + def del_server_out_of_bound_safe(self, last_rows, rows): #停止超流量的服务 #启动没超流量的服务 #需要动态载入switchrule,以便实时修改规则 @@ -180,7 +171,7 @@ class DbTransfer(object): if len(new_servers) > 0: from shadowsocks import eventloop - DbTransfer.get_instance().event.wait(eventloop.TIMEOUT_PRECISION) + self.event.wait(eventloop.TIMEOUT_PRECISION) for port in new_servers.keys(): passwd, cfg = new_servers[port] logging.info('db start server at port [%s] pass [%s]' % (port, passwd)) @@ -196,32 +187,68 @@ class DbTransfer(object): ServerPool.get_instance().cb_del_server(port) @staticmethod - def thread_db(): + def thread_db(obj): import socket import time + global db_instance timeout = 60 socket.setdefaulttimeout(timeout) last_rows = [] + db_instance = obj() try: while True: load_config() try: - DbTransfer.get_instance().push_db_all_user() - rows = DbTransfer.get_instance().pull_db_all_user() - DbTransfer.del_server_out_of_bound_safe(last_rows, rows) + db_instance.push_db_all_user() + rows = db_instance.pull_db_all_user() + db_instance.del_server_out_of_bound_safe(last_rows, rows) last_rows = rows except Exception as e: trace = traceback.format_exc() logging.error(trace) #logging.warn('db thread except:%s' % e) - if DbTransfer.get_instance().event.wait(get_config().MYSQL_UPDATE_TIME) or not ServerPool.get_instance().thread.is_alive(): + if db_instance.event.wait(get_config().MYSQL_UPDATE_TIME) or not ServerPool.get_instance().thread.is_alive(): break except KeyboardInterrupt as e: pass - DbTransfer.del_servers() + db_instance.del_servers() ServerPool.get_instance().stop() + db_instance = None @staticmethod def thread_db_stop(): - DbTransfer.get_instance().event.set() + global db_instance + db_instance.event.set() + +class MuJsonTransfer(DbTransfer): + def __init__(self): + super(MuJsonTransfer, self).__init__() + + def update_all_user(self, dt_transfer): + import json + rows = None + + config_path = "mudb.json" + with open(config_path, 'r+') as f: + rows = shell.parse_json_in_str(f.read().decode('utf8')) + for row in rows: + if "port" in row: + port = row["port"] + if port in dt_transfer: + row["u"] += dt_transfer[port][0] + row["d"] += dt_transfer[port][1] + + if rows: + output = json.dumps(rows, sort_keys=True, indent=4, separators=(',', ': ')) + with open(config_path, 'w') as f: + f.write(output) + + def pull_db_all_user(self): + rows = None + + config_path = "mudb.json" + with open(config_path, 'r+') as f: + rows = shell.parse_json_in_str(f.read().decode('utf8')) + + return rows diff --git a/mudb.json b/mudb.json new file mode 100644 index 0000000..22fad21 --- /dev/null +++ b/mudb.json @@ -0,0 +1,26 @@ +[ + { + "user": "admin", + "port": 443, + "u": 0, + "d": 0, + "transfer_enable": 1125899906842624, + "passwd": "admin", + "method": "aes-128-cfb", + "protocol": "auth_sha1_v2_compatible", + "obfs": "tls1.2_ticket_auth_compatible", + "enable": true + }, + { + "user": "user001", + "port": 10001, + "u": 0, + "d": 0, + "transfer_enable": 1099511627776, + "passwd": "abc123", + "method": "rc4-md5", + "protocol": "auth_sha1_v2_compatible", + "obfs": "http_post_compatible", + "enable": true + } +] diff --git a/server.py b/server.py index db375ad..75ce340 100644 --- a/server.py +++ b/server.py @@ -27,27 +27,28 @@ if __name__ == '__main__': import server_pool import db_transfer from shadowsocks import shell - -#def test(): -# thread.start_new_thread(DbTransfer.thread_db, ()) -# Api.web_server() +from configloader import load_config, get_config class MainThread(threading.Thread): - def __init__(self): + def __init__(self, obj): threading.Thread.__init__(self) + self.obj = obj def run(self): - db_transfer.DbTransfer.thread_db() + self.obj.thread_db(self.obj) def stop(self): - db_transfer.DbTransfer.thread_db_stop() + self.obj.thread_db_stop() def main(): shell.check_python() if False: db_transfer.DbTransfer.thread_db() else: - thread = MainThread() + if get_config().API_INTERFACE == 'mudbjson': + thread = MainThread(db_transfer.MuJsonTransfer) + else: + thread = MainThread(db_transfer.DbTransfer) thread.start() try: while thread.is_alive(): diff --git a/switchrule.py b/switchrule.py index e102c63..5b2d313 100644 --- a/switchrule.py +++ b/switchrule.py @@ -1,7 +1,6 @@ from configloader import load_config, get_config def getKeys(): - load_config() key_list = ['port', 'u', 'd', 'transfer_enable', 'passwd', 'enable' ] if get_config().API_INTERFACE == 'sspanelv3': key_list += ['method']