From d07c92c882da9509281c71e322b0fdfad4dce97b Mon Sep 17 00:00:00 2001 From: dnomd343 Date: Tue, 2 Aug 2022 11:15:35 +0800 Subject: [PATCH] update: new inspection process --- Basis/Api.py | 10 ++--- Basis/Check.py | 57 +++++++++++++--------------- Basis/Constant.py | 2 + Basis/Functions.py | 2 +- Basis/Manage.py | 86 ------------------------------------------ Basis/Manager.py | 92 +++++++++++++++++++++++++++++++++++++++++++++ Basis/Process.py | 7 +++- Builder/__init__.py | 42 ++++++++++----------- Checker/Http.py | 34 +++++++++++++++++ Checker/__init__.py | 18 +++++++++ main.py | 20 +++++----- manageDemo.py | 56 --------------------------- 12 files changed, 215 insertions(+), 211 deletions(-) delete mode 100644 Basis/Manage.py create mode 100644 Basis/Manager.py create mode 100644 Checker/Http.py create mode 100644 Checker/__init__.py delete mode 100755 manageDemo.py diff --git a/Basis/Api.py b/Basis/Api.py index 8761b17..c577417 100644 --- a/Basis/Api.py +++ b/Basis/Api.py @@ -3,8 +3,8 @@ import json from gevent import pywsgi -from Basis.Manage import Manage from Basis.Logger import logging +from Basis.Manager import Manager from Basis.Constant import Version from flask import Flask, Response, request @@ -39,7 +39,7 @@ def getTaskList() -> Response: if not tokenCheck(): # token check return tokenError() try: - taskList = Manage.listTask() + taskList = Manager.listUnion() logging.debug('api get task list -> %s' % taskList) return jsonResponse({ 'success': True, @@ -75,7 +75,7 @@ def createTask() -> Response: tasks = [] for proxy in proxyList: tasks.append({**proxy, 'check': checkList}) - checkId = Manage.addTask(tasks) + checkId = Manager.addUnion(tasks) logging.debug('api return check id %s' % checkId) return jsonResponse({ @@ -91,14 +91,14 @@ def getTaskInfo(taskId: str) -> Response: if not tokenCheck(): # token check return tokenError() logging.critical('API get task %s info' % taskId) - if not Manage.isTask(taskId): + if not Manager.isUnion(taskId): return jsonResponse({ 'success': False, 'message': 'task id not found', }) return jsonResponse({ 'success': True, - **Manage.getTask(taskId) + **Manager.getUnion(taskId) }) diff --git a/Basis/Check.py b/Basis/Check.py index 4fc3ec2..57b6ec9 100644 --- a/Basis/Check.py +++ b/Basis/Check.py @@ -1,50 +1,47 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- - +import copy import time +from Checker import Checker from Basis.Logger import logging from Builder import Builder, clientEntry -from ProxyChecker import httpCheck # TODO: refactor in the future - -def Check(proxyType: str, proxyInfo: dict, checkInfo: dict) -> dict: - # TODO: checkInfo -> [...] (only check http for now) - if proxyType not in clientEntry: - logging.error('Unknown proxy type %s' % proxyType) +def Check(taskId: str, taskInfo: dict) -> dict: + logging.info('[%s] Start checking process -> %s' % (taskId, taskInfo)) + if taskInfo['type'] not in clientEntry: + logging.error('[%s] Unknown proxy type %s' % (taskId, taskInfo['type'])) raise RuntimeError('Unknown proxy type') + try: - client = Builder(proxyType, proxyInfo) + client = Builder( + proxyType = taskInfo['type'], + proxyInfo = taskInfo['info'], + bindAddr = '127.0.0.1', # socks5 exposed host + taskId = taskId, + ) except Exception as reason: - logging.error('Client build error -> %s' % reason) + logging.error('[%s] Client build error -> %s' % (taskId, reason)) raise RuntimeError('Client build error') + logging.info('[%s] Client loaded successfully') - # TODO: debug combine output - logging.debug(client.id) - logging.debug(client.proxyType) - logging.debug(client.proxyInfo) - logging.debug(client.socksAddr) - logging.debug(client.socksPort) - - # TODO: wait port occupied + # TODO: wait port occupied (client.socksPort) time.sleep(1) if not client.status(): # client unexpected exit + logging.warning('[%s] Client unexpected exit') client.destroy() # remove file and kill sub process - logging.error('Client unexpected exit\n%s', client.output) + logging.debug('[%s] Client output\n%s', client.output) raise RuntimeError('Client unexpected exit') - # TODO: check process - status, _ = httpCheck(client.socksPort) # TODO: add socks5 addr - - logging.critical('http check status -> %s' % status) - + checkResult = Checker(taskId, taskInfo['check'], { + 'addr': client.socksAddr, + 'port': client.socksPort, + }) + logging.info('[%s] Client check result -> %s' % (taskId, checkResult)) client.destroy() # clean up the client - + taskInfo = copy.deepcopy(taskInfo) + taskInfo.pop('check') return { - 'http': { - 'status': status, - # TODO: more http check info - }, - # TODO: more check items (from checkInfo list) + **taskInfo, + 'result': checkResult, } - diff --git a/Basis/Constant.py b/Basis/Constant.py index 4fc0eb6..2f44b3f 100644 --- a/Basis/Constant.py +++ b/Basis/Constant.py @@ -2,6 +2,8 @@ # -*- coding: utf-8 -*- Version = 'dev' +WorkDir = '/tmp/ProxyC' +PathEnv = '/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin' # Shadowsocks Info ssMethods = { # methods support of different Shadowsocks project diff --git a/Basis/Functions.py b/Basis/Functions.py index 19ab99e..437fbfd 100644 --- a/Basis/Functions.py +++ b/Basis/Functions.py @@ -10,7 +10,7 @@ from IPy import IP from Basis.Logger import logging -def md5Sum(data: str, encode: str = 'UTF-8') -> str: +def md5Sum(data: str, encode: str = 'utf-8') -> str: return hashlib.md5(data.encode(encoding = encode)).hexdigest() diff --git a/Basis/Manage.py b/Basis/Manage.py deleted file mode 100644 index ea59be4..0000000 --- a/Basis/Manage.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import copy -from Basis.Logger import logging -from Basis.Functions import genFlag - - -class Task(object): - """ Manage global check task. - """ - __tasks = {} - __subTasks = {} # sub task status -> loaded / running / complete - - def __init__(self): - logging.info('task manager start') - - def addTask(self, tasks: list) -> str: # add task to manager (multi sub tasks) - subTasks = {} - subTaskIds = [] - for subTask in tasks: - subTaskId = genFlag(length = 24) # generate sub task id (24 bytes) - subTaskIds.append(subTaskId) - subTasks[subTaskId] = { - 'status': 'loaded', - 'data': copy.deepcopy(subTask) - } - logging.info('add sub task %s -> %s' % (subTaskId, subTasks[subTaskId]['data'])) - taskId = genFlag(length = 16) # generate task id (16 bytes) - self.__tasks[taskId] = { # load task - 'sub': subTaskIds - } - self.__subTasks.update(subTasks) # load sub tasks - logging.info('task %s loaded' % taskId) - return taskId - - def isTask(self, taskId: str) -> bool: # check if the task id exist - return taskId in self.__tasks - - def getTask(self, taskId: str) -> dict: # get task status (remove sub tasks when all completed) - if taskId not in self.__tasks: - logging.error('task id %s not found' % taskId) - raise RuntimeError('task id not found') - subList = self.__tasks[taskId]['sub'] - completed = 0 - for subTaskId in subList: - if self.__subTasks[subTaskId]['status'] == 'complete': # get number of completed sub task - completed += 1 - logging.debug('[%s] statistics sub task status -> %i/%i' % (taskId, completed, len(subList))) - if completed < len(subList): # some sub tasks are not completed - logging.debug('[%s] task still running' % taskId) - return { - 'finish': False, - 'percent': '%i%%' % (completed / len(subList)) - } - logging.debug('[%s] task work complete' % taskId) # all sub tasks completed - result = [] - for subTaskId in subList: - subTask = self.__subTasks[subTaskId] - self.__subTasks.pop(subTaskId) - result.append(subTask['data']) - logging.debug('release sub tasks -> %s' % result) - return { - 'finish': True, - 'result': result - } - - def listTask(self) -> list: # get all task ids - return [x for x in self.__tasks] - - def popSubTask(self) -> tuple[str or None, any]: # fetch a loaded sub task - for subTaskId, subTask in self.__subTasks.items(): - if subTask['status'] != 'loaded': continue # only get loaded sub task - subTask['status'] = 'running' # set status as running - return subTaskId, copy.deepcopy(subTask['data']) - return None, None - - def updateSubTask(self, subTaskId: str, subTaskData: dict) -> None: # update sub task data when completed - if subTaskId not in self.__subTasks: - logging.error('sub task id %s not found' % subTaskId) - raise RuntimeError('sub task id not found') - self.__subTasks[subTaskId]['data'] = copy.deepcopy(subTaskData) - self.__subTasks[subTaskId]['status'] = 'complete' - - -Manage = Task() # global task manager diff --git a/Basis/Manager.py b/Basis/Manager.py new file mode 100644 index 0000000..6360ce2 --- /dev/null +++ b/Basis/Manager.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import copy +from Basis.Logger import logging +from Basis.Functions import genFlag + + +class Task(object): + """ Manage global check task. + """ + __tasks = {} # task status -> loaded / running / complete + __unions = {} # one union include multi tasks + __TASK_LOADED = -1 + __TASK_RUNNING = 0 + __TASK_FINISH = 1 + + def __init__(self): + logging.info('Manager start') + + def listUnion(self) -> list: # get all union ids + return [x for x in self.__unions] + + def isUnion(self, unionId: str) -> bool: # check if the union id exist + return unionId in self.__unions + + def addUnion(self, union: list) -> str: # add union to manager (include multi tasks) + tasks = {} # temporary task storage + taskIds = [] # task id list for manage union + unionId = genFlag(length = 16) # generate union id (16 bytes) + logging.debug('Manager start to load union [%s]' % unionId) + for task in union: + taskId = genFlag(length = 24) # generate task id (24 bytes) + taskIds.append(taskId) + tasks[taskId] = { + 'status': self.__TASK_LOADED, # task status -> loaded + 'data': copy.deepcopy(task) # save task info + } + logging.info('Manager add task [%s] -> %s' % (taskId, task)) + self.__tasks.update(tasks) # load into task list + self.__unions[unionId] = { + 'items': taskIds # record task items + } + logging.info('Manager add union [%s] -> %s' % (unionId, taskIds)) + return unionId + + def getUnion(self, unionId: str) -> dict: # get union status (remove tasks when all completed) + if unionId not in self.__unions: + logging.error('Manager union [%s] not found' % unionId) + raise RuntimeError('Union id not found') + tasks = self.__unions[unionId]['items'] + finishNum = 0 + for taskId in tasks: + if self.__tasks[taskId]['status'] == self.__TASK_FINISH: # get number of completed task + finishNum += 1 + logging.info('Manager statistics union [%s] -> %i/%i' % (unionId, finishNum, len(tasks))) + if finishNum < len(tasks): # some tasks are not yet completed + logging.debug('Manager union [%s] still working' % unionId) + return { + 'finish': False, + 'percent': '%f' % (round(finishNum / len(tasks), 2)) + } + self.__unions.pop(unionId) # remove from union list + unionResult = [] # temporary storage + for taskId in tasks: + task = self.__tasks[taskId] + self.__tasks.pop(taskId) # remove from task list + unionResult.append(task['data']) + logging.info('Manager release union [%s] -> %s' % (unionId, unionResult)) + return { + 'finish': True, + 'result': unionResult + } + + def popTask(self) -> tuple[str or None, any]: # fetch a loaded task + for taskId, task in self.__tasks.items(): + if task['status'] != self.__TASK_LOADED: continue # only get loaded task + task['status'] = self.__TASK_RUNNING # set task status as running + logging.info('Manager pop task [%s] -> %s' % (taskId, task['data'])) + return taskId, copy.deepcopy(task['data']) + logging.debug('Manager has no more loaded tasks') + raise RuntimeError('No more tasks') + + def finishTask(self, taskId: str, taskData: dict) -> None: # update task data when completed + if taskId not in self.__tasks: + logging.error('Manager task [%s] not found' % taskId) + raise RuntimeError('Task id not found') + self.__tasks[taskId]['data'] = copy.deepcopy(taskData) + self.__tasks[taskId]['status'] = self.__TASK_FINISH # set task status as completed + + +Manager = Task() # global task manager diff --git a/Basis/Process.py b/Basis/Process.py index f050e19..2df40d3 100644 --- a/Basis/Process.py +++ b/Basis/Process.py @@ -153,8 +153,11 @@ class Process(object): else: # discard all the output of sub process stdout = DEVNULL stderr = DEVNULL - self.__process = Popen(self.cmd, env = self.env, stdout = stdout, - stderr = stderr, preexec_fn = None if libcPath is None else Process.__preExec) + self.__process = Popen( + self.cmd, env = self.env, + stdout = stdout, stderr = stderr, + preexec_fn = None if libcPath is None else Process.__preExec + ) logging.info('[%s] Process running -> PID = %i' % (self.id, self.__process.pid)) def signal(self, signalNum: int) -> None: # send specified signal to sub process diff --git a/Builder/__init__.py b/Builder/__init__.py index e0e1e7c..a0de0e4 100644 --- a/Builder/__init__.py +++ b/Builder/__init__.py @@ -3,6 +3,10 @@ import os import copy +from Basis.Logger import logging +from Basis.Process import Process +from Basis.Constant import WorkDir, PathEnv +from Basis.Functions import hostFormat, genFlag, getAvailablePort from Builder import Brook from Builder import VMess @@ -13,11 +17,6 @@ from Builder import Hysteria from Builder import Shadowsocks from Builder import ShadowsocksR -from Basis.Logger import logging -from Basis.Process import Process -from Basis.Functions import genFlag, getAvailablePort - -pathEnv = '/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin' clientEntry = { 'ss': [Shadowsocks.load, '.json'], 'ssr': [ShadowsocksR.load, '.json'], @@ -38,9 +37,7 @@ class Builder(object): proxyInfo: Proxy node information. - bind: Socks5 proxy bind address. - - workDir: A directory for storing configuration files. + bindAddr: Socks5 proxy bind address. taskId: Task ID, defaults to 12 random characters length. @@ -50,32 +47,32 @@ class Builder(object): output = None def __loadClient(self): - logging.info('[%s] Load %s proxy client at %s -> %s' % (self.id, self.proxyType, ( - (('[%s]' if ':' in self.socksAddr else '%s') + ':%i') % (self.socksAddr, self.socksPort) - ), str(self.proxyInfo))) + logging.info('[%s] Builder load %s proxy client at %s -> %s' % ( + self.id, self.proxyType, + 'socks5://%s:%i' % (hostFormat(self.socksAddr, v6Bracket = True), self.socksPort), self.proxyInfo + )) configFile = os.path.join( # config file path - self.__workDir, self.id + clientEntry[self.proxyType][1] # workDir + taskId + suffix + WorkDir, self.id + clientEntry[self.proxyType][1] # workDir + taskId + file suffix ) + logging.debug('[%s] Builder config file -> %s' % (self.id, configFile)) command, fileContent, envVar = clientEntry[self.proxyType][0](self.proxyInfo, { # load client boot info 'addr': self.socksAddr, 'port': self.socksPort, }, configFile) - fileObject = { # add config file settings + envVar['PATH'] = PathEnv # add PATH env (some programs need it) + self.__process = Process(WorkDir, taskId = self.id, cmd = command, env = envVar, file = { # start process 'path': configFile, 'content': fileContent - } - envVar['PATH'] = pathEnv # add PATH env (some programs need it) - self.__process = Process(self.__workDir, taskId = self.id, cmd = command, env = envVar, file = fileObject) + }) - def __init__(self, proxyType: str, proxyInfo: dict, taskId: str = '', - bind: str = '127.0.0.1', workDir: str = '/tmp/ProxyC') -> None: # init proxy client + def __init__(self, proxyType: str, proxyInfo: dict, bindAddr: str, taskId: str = '') -> None: # init proxy client + self.id = genFlag(length = 12) if taskId == '' else taskId # load task ID if proxyType not in clientEntry: + logging.error('[%s] Builder receive unknown proxy type %s' % (self.id, proxyType)) raise RuntimeError('Unknown proxy type') - self.id = genFlag(length = 12) if taskId == '' else taskId # load task ID - self.__workDir = workDir self.proxyType = proxyType # proxy type -> ss / ssr / vmess ... - self.proxyInfo = copy.copy(proxyInfo) # proxy object -> contain connection info - self.socksAddr = bind + self.proxyInfo = copy.copy(proxyInfo) # connection info + self.socksAddr = bindAddr self.socksPort = getAvailablePort() # random port for socks5 exposed self.__loadClient() @@ -83,5 +80,6 @@ class Builder(object): return self.__process.status() def destroy(self) -> None: # kill sub process and remove config file + logging.debug('[%s] Builder destroy' % self.id) self.__process.quit() self.output = self.__process.output diff --git a/Checker/Http.py b/Checker/Http.py new file mode 100644 index 0000000..c62571c --- /dev/null +++ b/Checker/Http.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import time +import requests +from Basis.Logger import logging +from Basis.Functions import hostFormat + + +def httpPing(taskId: str, url: str, socksAddr: str, socksPort: int, timeout: int) -> float: + try: + startTime = time.time_ns() # request start time + socksProxy = 'socks5://%s:%i' % (hostFormat(socksAddr, v6Bracket = True), socksPort) + logging.debug('[%s] Http ping -> request %s via %s' % (taskId, url, socksProxy)) + httpRequest = requests.get(url, proxies = { # send http request by socks5 proxy + 'http': socksProxy, + 'https': socksProxy, + }, timeout = timeout) + except: # something error on request process (timeout or proxy not working) + logging.debug('[%s] Http ping -> request error' % taskId) + return -1 + if httpRequest.status_code not in range(200, 300): # http code not 2xx + logging.debug('[%s] Http ping -> status code %i not expected' % (taskId, httpRequest.status_code)) + return -1 + delay = (time.time_ns() - startTime) / (10 ** 6) # ns -> ms + logging.debug('[%s] Http ping -> delay %f ms' % (taskId, delay)) + return round(delay, 2) # two decimal places + + +def check(taskId: str, socksInfo: dict, options: dict) -> dict: + # TODO: multi check + return { # TODO: just demo + 'delay': httpPing(taskId, options['url'], socksInfo['addr'], socksInfo['port'], options['timeout']) + } diff --git a/Checker/__init__.py b/Checker/__init__.py new file mode 100644 index 0000000..df894c3 --- /dev/null +++ b/Checker/__init__.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from Checker import Http + +def Checker(taskId: str, checkInfo: dict, socksInfo: dict) -> dict: + + # TODO: ignore checkInfo for now + + httpRet = Http.check(taskId, socksInfo, { + 'url': 'http://baidu.com/', + 'timeout': 20, + }) + return { + 'http': httpRet # TODO: just check http delay for now + } + + # TODO: return check result diff --git a/main.py b/main.py index 76d78e2..21d00f5 100755 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- + import os import time import _thread @@ -36,24 +37,25 @@ def daemonDnsproxy(servers: list or None, port: int = 53, cache: int = 4194304) time.sleep(2) # daemon time gap if dnsproxy.poll() is not None: # unexpected exit logging.warning('dnsproxy unexpected exit') - logging.debug('output of dnsproxy\n%s' % dnsproxy.stdout.read().decode('UTF-8')) + logging.debug('output of dnsproxy\n%s' % dnsproxy.stdout.read().decode('utf-8')) dnsproxy = startDnsproxy(dnsCommand) from Basis.Check import Check -from Basis.Manage import Manage +from Basis.Manager import Manager def loopCheck() -> None: while True: time.sleep(2) # TODO: thread pool working - subTaskId, subTask = Manage.popSubTask() - if subTaskId is None: continue - logging.info('new sub task -> %s', subTask) - ret = Check(subTask['type'], subTask['info'], {}) + try: + taskId, taskInfo = Manager.popTask() + except: + logging.debug('no more task') + continue + logging.info('new task %s -> %s' % (taskId, taskInfo)) + ret = Check(taskId, taskInfo) logging.info('check result -> %s' % ret) - subTask.pop('check') - subTask['result'] = ret - Manage.updateSubTask(subTaskId, subTask) + Manager.finishTask(taskId, ret) logging.warning('ProxyC starts running (%s)' % Version) diff --git a/manageDemo.py b/manageDemo.py deleted file mode 100755 index fb71a1d..0000000 --- a/manageDemo.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -from Basis.Manage import Manage -from Basis.Logger import logging - - -taskId_1 = Manage.addTask([ - {'test': 1}, - {'test': 2}, - {'test': 3}, -]) -logging.critical('task id 1 -> %s' % taskId_1) - -taskId_2 = Manage.addTask([ - {'demo': 1}, - {'demo': 2}, - {'demo': 3}, -]) -logging.critical('task id 2 -> %s' % taskId_2) - -logging.critical('list task -> %s' % Manage.listTask()) - -logging.critical('is task 1234 -> %s' % Manage.isTask('1234')) -logging.critical('is task %s -> %s' % (taskId_1, Manage.isTask(taskId_1))) - -logging.critical('get task %s -> %s' % (taskId_1, Manage.getTask(taskId_1))) - -subTaskId_1, subTask_1 = Manage.popSubTask() -logging.critical('pop sub task %s -> %s' % (subTaskId_1, subTask_1)) -subTaskId_2, subTask_2 = Manage.popSubTask() -logging.critical('pop sub task %s -> %s' % (subTaskId_2, subTask_2)) -subTaskId_3, subTask_3 = Manage.popSubTask() -logging.critical('pop sub task %s -> %s' % (subTaskId_3, subTask_3)) - -Manage.updateSubTask(subTaskId_3, {'test': 33}) -logging.critical('update sub task %s' % subTaskId_3) -Manage.updateSubTask(subTaskId_2, {'test': 22}) -logging.critical('update sub task %s' % subTaskId_2) -Manage.updateSubTask(subTaskId_1, {'test': 11}) -logging.critical('update sub task %s' % subTaskId_1) - -logging.critical('get task %s -> %s' % (taskId_1, Manage.getTask(taskId_1))) - -subTaskId_4, subTask_4 = Manage.popSubTask() -logging.critical('pop sub task %s -> %s' % (subTaskId_4, subTask_4)) - -Manage.updateSubTask(subTaskId_4, {'demo': 2333}) -logging.critical('update sub task %s' % subTaskId_4) - -logging.critical('get task %s -> %s' % (taskId_2, Manage.getTask(taskId_2))) - -logging.critical('sub task pop -> ' + str(Manage.popSubTask())) -logging.critical('sub task pop -> ' + str(Manage.popSubTask())) -logging.critical('sub task pop -> ' + str(Manage.popSubTask())) -