#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys import copy import time import _thread import argparse import compileall from Utils import Constant from Utils.Exception import checkException def mainArgParse(rawArgs: list) -> argparse.Namespace: mainParser = argparse.ArgumentParser(description = 'Start running API server') mainParser.add_argument('--log', type = str, default = Constant.LogLevel, help = 'output log level') mainParser.add_argument('--dns', type = str, default = Constant.DnsServer, nargs = '+', help = 'specify dns server') mainParser.add_argument('--port', type = int, default = Constant.ApiPort, help = 'port for running') mainParser.add_argument('--path', type = str, default = Constant.ApiPath, help = 'root path for api server') mainParser.add_argument('--token', type = str, default = Constant.ApiToken, help = 'token for api server') mainParser.add_argument('--thread', type = int, default = Constant.CheckThread, help = 'number of check thread') mainParser.add_argument('-v', '--version', help = 'show version', action = 'store_true') return mainParser.parse_args(rawArgs) def testArgParse(rawArgs: list) -> argparse.Namespace: testParser = argparse.ArgumentParser(description = 'Test that each function is working properly') testParser.add_argument('PROTOCOL', type = str, help = 'test protocol name') testParser.add_argument('-a', '--all', help = 'test extra shadowsocks items', action = 'store_true') testParser.add_argument('-6', '--ipv6', help = 'test on ipv6 network', action = 'store_true') testParser.add_argument('--debug', help = 'enable debug log level', action = 'store_true') testParser.add_argument('--url', type = str, default = 'http://baidu.com', help = 'http request url') testParser.add_argument('--cert', type = str, default = '', help = 'specify the certificate id') testParser.add_argument('--thread', type = int, default = 16, help = 'thread number in check process') testParser.add_argument('--select', type = str, nargs = '+', help = 'select id list for test') return testParser.parse_args(rawArgs) testArgs = None testMode = False inputArgs = copy.copy(sys.argv) if len(inputArgs) >= 0: # remove first arg (normally file name) inputArgs.pop(0) if len(inputArgs) != 0 and inputArgs[0].lower() == 'test': # test mode inputArgs.pop(0) # remove `test` if len(inputArgs) == 0 or inputArgs[0].startswith('-'): # no protocol is specified inputArgs = ['all'] + inputArgs testArgs = testArgParse(inputArgs) Constant.LogLevel = 'debug' if testArgs.debug else 'warning' testMode = True else: mainArgs = mainArgParse(inputArgs) if mainArgs.version: # output version and exit print('ProxyC version %s' % Constant.Version) sys.exit(0) Constant.LogLevel = mainArgs.log # overwrite global options Constant.DnsServer = mainArgs.dns Constant.ApiPort = mainArgs.port Constant.ApiPath = mainArgs.path Constant.ApiToken = mainArgs.token Constant.CheckThread = mainArgs.thread from Tester import testEntry from Utils.Check import Check # from Utils import Api, DnsProxy from Utils import DnsProxy from Utils.Logger import logger from Utils.Manager import Manager from Utils.Tester import Test, loadBind, loadCert from concurrent.futures import ThreadPoolExecutor def pythonCompile(dirRange: str = '/') -> None: # python optimize compile for optimize in [-1, 1, 2]: compileall.compile_dir(dirRange, quiet = 1, optimize = optimize) logger.warning('Python optimize compile -> %s (level = %i)' % (dirRange, optimize)) def runCheck(taskId: str, taskInfo: dict) -> None: success = True checkResult = {} try: checkResult = Check(taskId, taskInfo) # check by task info logger.warning('[%s] Task finish' % taskId) except checkException as exp: success = False logger.error('[%s] Task error -> %s' % (taskId, exp)) except: success = False logger.error('[%s] Task error -> Unknown error' % taskId) finally: if not success: # got some error in check process taskInfo.pop('check') checkResult = { **taskInfo, 'success': False, } Manager.finishTask(taskId, checkResult) # commit check result def loop(threadNum: int) -> None: logger.warning('Loop check start -> %i threads' % threadNum) threadPool = ThreadPoolExecutor(max_workers = threadNum) # init thread pool while True: try: taskId, taskInfo = Manager.popTask() # pop a task logger.warning('[%s] Load new task' % taskId) except: # no more task time.sleep(2) continue threadPool.submit(runCheck, taskId, taskInfo) # submit into thread pool if testMode: # test mode loadBind(serverV6 = testArgs.ipv6, clientV6 = testArgs.ipv6) # ipv4 / ipv6 (127.0.0.1 / ::1) loadCert(certId = testArgs.cert) # cert config logger.critical('TEST ITEM: %s' % testArgs.PROTOCOL) logger.critical('SELECT: ' + str(testArgs.select)) logger.critical('URL: %s' % testArgs.url) logger.critical('THREAD NUMBER: %i' % testArgs.thread) logger.critical('-' * 32 + ' TEST START ' + '-' * 32) if testArgs.PROTOCOL == 'all': # run all test items for item in testEntry: if item == ('ss' if testArgs.all else 'ss-all'): # skip ss / ss-all continue logger.critical('TEST ITEM -> ' + item) Test(testEntry[item], testArgs.thread, testArgs.url, testArgs.select) else: # run single item if testArgs.PROTOCOL == 'ss' and testArgs.all: # test shadowsocks extra items testItem = 'ss-all' Test(testEntry[testArgs.PROTOCOL], testArgs.thread, testArgs.url, testArgs.select) logger.critical('-' * 32 + ' TEST COMPLETE ' + '-' * 32) sys.exit(0) # test complete logger.warning('ProxyC starts running (%s)' % Constant.Version) _thread.start_new_thread(pythonCompile, ('/usr',)) # python compile (generate .pyc file) _thread.start_new_thread(DnsProxy.start, (Constant.DnsServer, 53)) # start dns server _thread.start_new_thread(loop, (Constant.CheckThread, )) # start check loop # Api.startServer() # start api server