#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys import copy import time import _thread import argparse import compileall from Utils import Constant from Utils.Exception import checkException def mainArgParse(rawArgs: list) -> argparse.Namespace: mainParser = argparse.ArgumentParser(description = 'Start running API server') mainParser.add_argument('--log', type = str, default = Constant.LogLevel, help = 'output log level') mainParser.add_argument('--dns', type = str, default = Constant.DnsServer, nargs = '+', help = 'specify dns server') mainParser.add_argument('--port', type = int, default = Constant.ApiPort, help = 'port for running') mainParser.add_argument('--path', type = str, default = Constant.ApiPath, help = 'root path for api server') mainParser.add_argument('--token', type = str, default = Constant.ApiToken, help = 'token for api server') mainParser.add_argument('--thread', type = int, default = Constant.CheckThread, help = 'number of check thread') mainParser.add_argument('-v', '--version', help = 'show version', action = 'store_true') return mainParser.parse_args(rawArgs) def testArgParse(rawArgs: list) -> argparse.Namespace: testParser = argparse.ArgumentParser(description = 'Test that each function is working properly') testParser.add_argument('PROTOCOL', type = str, help = 'test protocol name') testParser.add_argument('-a', '--all', help = 'test extra shadowsocks items', action = 'store_true') testParser.add_argument('-6', '--ipv6', help = 'test on ipv6 network', action = 'store_true') testParser.add_argument('--debug', help = 'enable debug log level', action = 'store_true') testParser.add_argument('--url', type = str, default = 'http://baidu.com', help = 'http request url') testParser.add_argument('--cert', type = str, default = '', help = 'specify the certificate id') testParser.add_argument('--thread', type = int, default = 16, help = 'thread number in check process') testParser.add_argument('--select', type = str, nargs = '+', help = 'select id list for test') return testParser.parse_args(rawArgs) testArgs = None testMode = False inputArgs = copy.copy(sys.argv) if len(inputArgs) >= 0: # remove first arg (normally file name) inputArgs.pop(0) if len(inputArgs) != 0 and inputArgs[0].lower() == 'test': # test mode inputArgs.pop(0) # remove `test` if len(inputArgs) == 0 or inputArgs[0].startswith('-'): # no protocol is specified inputArgs = ['all'] + inputArgs testArgs = testArgParse(inputArgs) Constant.LogLevel = 'debug' if testArgs.debug else 'warning' testMode = True else: mainArgs = mainArgParse(inputArgs) if mainArgs.version: # output version and exit print('ProxyC version -> %s' % Constant.Version) sys.exit(0) Constant.LogLevel = mainArgs.log # overwrite global options Constant.DnsServer = mainArgs.dns Constant.ApiPort = mainArgs.port Constant.ApiPath = mainArgs.path Constant.ApiToken = mainArgs.token Constant.CheckThread = mainArgs.thread from Tester import testEntry from Utils.Check import Check from Utils import Api, DnsProxy from Utils.Logger import logger from Utils.Manager import Manager from Utils.Test import Test, loadBind, loadCert from concurrent.futures import ThreadPoolExecutor def pythonCompile(dirRange: str = '/') -> None: # python optimize compile for optimize in [-1, 1, 2]: compileall.compile_dir(dirRange, quiet = 1, optimize = optimize) logger.warning('Python optimize compile -> %s (level = %i)' % (dirRange, optimize)) def runCheck(taskId: str, taskInfo: dict) -> None: success = True checkResult = {} try: checkResult = Check(taskId, taskInfo) # check by task info logger.warning('[%s] Task finish' % taskId) except checkException as exp: success = False logger.error('[%s] Task error -> %s' % (taskId, exp)) except: success = False logger.error('[%s] Task error -> Unknown error' % taskId) finally: if not success: # got some error in check process taskInfo.pop('check') checkResult = { **taskInfo, 'success': False, } Manager.finishTask(taskId, checkResult) # commit check result def loop(threadNum: int) -> None: logger.warning('Loop check start -> %i threads' % threadNum) threadPool = ThreadPoolExecutor(max_workers = threadNum) # init thread pool while True: try: taskId, taskInfo = Manager.popTask() # pop a task logger.warning('[%s] Load new task' % taskId) except: # no more task time.sleep(2) continue threadPool.submit(runCheck, taskId, taskInfo) # submit into thread pool if testMode: # test mode loadBind(serverV6 = testArgs.ipv6, clientV6 = testArgs.ipv6) # ipv4 / ipv6 (127.0.0.1 / ::1) loadCert(certId = testArgs.cert) # cert config logger.critical('TEST ITEM: %s' % testArgs.PROTOCOL) logger.critical('SELECT: ' + str(testArgs.select)) logger.critical('URL: %s' % testArgs.url) logger.critical('THREAD NUMBER: %i' % testArgs.thread) logger.critical('-' * 32 + ' TEST START ' + '-' * 32) if testArgs.PROTOCOL == 'all': # run all test items for item in testEntry: if item == ('ss' if testArgs.all else 'ss-all'): # skip ss / ss-all continue logger.critical('TEST ITEM -> ' + item) Test(testEntry[item], testArgs.thread, testArgs.url, testArgs.select) else: # run single item if testArgs.PROTOCOL == 'ss' and testArgs.all: # test shadowsocks extra items testItem = 'ss-all' Test(testEntry[testArgs.PROTOCOL], testArgs.thread, testArgs.url, testArgs.select) logger.critical('-' * 32 + ' TEST COMPLETE ' + '-' * 32) sys.exit(0) # test complete logger.warning('ProxyC starts running (%s)' % Constant.Version) _thread.start_new_thread(pythonCompile, ('/usr',)) # python compile (generate .pyc file) _thread.start_new_thread(DnsProxy.start, (Constant.DnsServer, 53)) # start dns server _thread.start_new_thread(loop, (Constant.CheckThread, )) # start check loop Api.startServer() # start api server