Browse Source

update: new inspection process

master
dnomd343 2 years ago
parent
commit
d07c92c882
  1. 10
      Basis/Api.py
  2. 57
      Basis/Check.py
  3. 2
      Basis/Constant.py
  4. 2
      Basis/Functions.py
  5. 86
      Basis/Manage.py
  6. 92
      Basis/Manager.py
  7. 7
      Basis/Process.py
  8. 42
      Builder/__init__.py
  9. 34
      Checker/Http.py
  10. 18
      Checker/__init__.py
  11. 20
      main.py
  12. 56
      manageDemo.py

10
Basis/Api.py

@ -3,8 +3,8 @@
import json
from gevent import pywsgi
from Basis.Manage import Manage
from Basis.Logger import logging
from Basis.Manager import Manager
from Basis.Constant import Version
from flask import Flask, Response, request
@ -39,7 +39,7 @@ def getTaskList() -> Response:
if not tokenCheck(): # token check
return tokenError()
try:
taskList = Manage.listTask()
taskList = Manager.listUnion()
logging.debug('api get task list -> %s' % taskList)
return jsonResponse({
'success': True,
@ -75,7 +75,7 @@ def createTask() -> Response:
tasks = []
for proxy in proxyList:
tasks.append({**proxy, 'check': checkList})
checkId = Manage.addTask(tasks)
checkId = Manager.addUnion(tasks)
logging.debug('api return check id %s' % checkId)
return jsonResponse({
@ -91,14 +91,14 @@ def getTaskInfo(taskId: str) -> Response:
if not tokenCheck(): # token check
return tokenError()
logging.critical('API get task %s info' % taskId)
if not Manage.isTask(taskId):
if not Manager.isUnion(taskId):
return jsonResponse({
'success': False,
'message': 'task id not found',
})
return jsonResponse({
'success': True,
**Manage.getTask(taskId)
**Manager.getUnion(taskId)
})

57
Basis/Check.py

@ -1,50 +1,47 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import copy
import time
from Checker import Checker
from Basis.Logger import logging
from Builder import Builder, clientEntry
from ProxyChecker import httpCheck # TODO: refactor in the future
def Check(proxyType: str, proxyInfo: dict, checkInfo: dict) -> dict:
# TODO: checkInfo -> [...] (only check http for now)
if proxyType not in clientEntry:
logging.error('Unknown proxy type %s' % proxyType)
def Check(taskId: str, taskInfo: dict) -> dict:
logging.info('[%s] Start checking process -> %s' % (taskId, taskInfo))
if taskInfo['type'] not in clientEntry:
logging.error('[%s] Unknown proxy type %s' % (taskId, taskInfo['type']))
raise RuntimeError('Unknown proxy type')
try:
client = Builder(proxyType, proxyInfo)
client = Builder(
proxyType = taskInfo['type'],
proxyInfo = taskInfo['info'],
bindAddr = '127.0.0.1', # socks5 exposed host
taskId = taskId,
)
except Exception as reason:
logging.error('Client build error -> %s' % reason)
logging.error('[%s] Client build error -> %s' % (taskId, reason))
raise RuntimeError('Client build error')
logging.info('[%s] Client loaded successfully')
# TODO: debug combine output
logging.debug(client.id)
logging.debug(client.proxyType)
logging.debug(client.proxyInfo)
logging.debug(client.socksAddr)
logging.debug(client.socksPort)
# TODO: wait port occupied
# TODO: wait port occupied (client.socksPort)
time.sleep(1)
if not client.status(): # client unexpected exit
logging.warning('[%s] Client unexpected exit')
client.destroy() # remove file and kill sub process
logging.error('Client unexpected exit\n%s', client.output)
logging.debug('[%s] Client output\n%s', client.output)
raise RuntimeError('Client unexpected exit')
# TODO: check process
status, _ = httpCheck(client.socksPort) # TODO: add socks5 addr
logging.critical('http check status -> %s' % status)
checkResult = Checker(taskId, taskInfo['check'], {
'addr': client.socksAddr,
'port': client.socksPort,
})
logging.info('[%s] Client check result -> %s' % (taskId, checkResult))
client.destroy() # clean up the client
taskInfo = copy.deepcopy(taskInfo)
taskInfo.pop('check')
return {
'http': {
'status': status,
# TODO: more http check info
},
# TODO: more check items (from checkInfo list)
**taskInfo,
'result': checkResult,
}

2
Basis/Constant.py

@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
Version = 'dev'
WorkDir = '/tmp/ProxyC'
PathEnv = '/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin'
# Shadowsocks Info
ssMethods = { # methods support of different Shadowsocks project

2
Basis/Functions.py

@ -10,7 +10,7 @@ from IPy import IP
from Basis.Logger import logging
def md5Sum(data: str, encode: str = 'UTF-8') -> str:
def md5Sum(data: str, encode: str = 'utf-8') -> str:
return hashlib.md5(data.encode(encoding = encode)).hexdigest()

86
Basis/Manage.py

@ -1,86 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import copy
from Basis.Logger import logging
from Basis.Functions import genFlag
class Task(object):
""" Manage global check task.
"""
__tasks = {}
__subTasks = {} # sub task status -> loaded / running / complete
def __init__(self):
logging.info('task manager start')
def addTask(self, tasks: list) -> str: # add task to manager (multi sub tasks)
subTasks = {}
subTaskIds = []
for subTask in tasks:
subTaskId = genFlag(length = 24) # generate sub task id (24 bytes)
subTaskIds.append(subTaskId)
subTasks[subTaskId] = {
'status': 'loaded',
'data': copy.deepcopy(subTask)
}
logging.info('add sub task %s -> %s' % (subTaskId, subTasks[subTaskId]['data']))
taskId = genFlag(length = 16) # generate task id (16 bytes)
self.__tasks[taskId] = { # load task
'sub': subTaskIds
}
self.__subTasks.update(subTasks) # load sub tasks
logging.info('task %s loaded' % taskId)
return taskId
def isTask(self, taskId: str) -> bool: # check if the task id exist
return taskId in self.__tasks
def getTask(self, taskId: str) -> dict: # get task status (remove sub tasks when all completed)
if taskId not in self.__tasks:
logging.error('task id %s not found' % taskId)
raise RuntimeError('task id not found')
subList = self.__tasks[taskId]['sub']
completed = 0
for subTaskId in subList:
if self.__subTasks[subTaskId]['status'] == 'complete': # get number of completed sub task
completed += 1
logging.debug('[%s] statistics sub task status -> %i/%i' % (taskId, completed, len(subList)))
if completed < len(subList): # some sub tasks are not completed
logging.debug('[%s] task still running' % taskId)
return {
'finish': False,
'percent': '%i%%' % (completed / len(subList))
}
logging.debug('[%s] task work complete' % taskId) # all sub tasks completed
result = []
for subTaskId in subList:
subTask = self.__subTasks[subTaskId]
self.__subTasks.pop(subTaskId)
result.append(subTask['data'])
logging.debug('release sub tasks -> %s' % result)
return {
'finish': True,
'result': result
}
def listTask(self) -> list: # get all task ids
return [x for x in self.__tasks]
def popSubTask(self) -> tuple[str or None, any]: # fetch a loaded sub task
for subTaskId, subTask in self.__subTasks.items():
if subTask['status'] != 'loaded': continue # only get loaded sub task
subTask['status'] = 'running' # set status as running
return subTaskId, copy.deepcopy(subTask['data'])
return None, None
def updateSubTask(self, subTaskId: str, subTaskData: dict) -> None: # update sub task data when completed
if subTaskId not in self.__subTasks:
logging.error('sub task id %s not found' % subTaskId)
raise RuntimeError('sub task id not found')
self.__subTasks[subTaskId]['data'] = copy.deepcopy(subTaskData)
self.__subTasks[subTaskId]['status'] = 'complete'
Manage = Task() # global task manager

92
Basis/Manager.py

@ -0,0 +1,92 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import copy
from Basis.Logger import logging
from Basis.Functions import genFlag
class Task(object):
""" Manage global check task.
"""
__tasks = {} # task status -> loaded / running / complete
__unions = {} # one union include multi tasks
__TASK_LOADED = -1
__TASK_RUNNING = 0
__TASK_FINISH = 1
def __init__(self):
logging.info('Manager start')
def listUnion(self) -> list: # get all union ids
return [x for x in self.__unions]
def isUnion(self, unionId: str) -> bool: # check if the union id exist
return unionId in self.__unions
def addUnion(self, union: list) -> str: # add union to manager (include multi tasks)
tasks = {} # temporary task storage
taskIds = [] # task id list for manage union
unionId = genFlag(length = 16) # generate union id (16 bytes)
logging.debug('Manager start to load union [%s]' % unionId)
for task in union:
taskId = genFlag(length = 24) # generate task id (24 bytes)
taskIds.append(taskId)
tasks[taskId] = {
'status': self.__TASK_LOADED, # task status -> loaded
'data': copy.deepcopy(task) # save task info
}
logging.info('Manager add task [%s] -> %s' % (taskId, task))
self.__tasks.update(tasks) # load into task list
self.__unions[unionId] = {
'items': taskIds # record task items
}
logging.info('Manager add union [%s] -> %s' % (unionId, taskIds))
return unionId
def getUnion(self, unionId: str) -> dict: # get union status (remove tasks when all completed)
if unionId not in self.__unions:
logging.error('Manager union [%s] not found' % unionId)
raise RuntimeError('Union id not found')
tasks = self.__unions[unionId]['items']
finishNum = 0
for taskId in tasks:
if self.__tasks[taskId]['status'] == self.__TASK_FINISH: # get number of completed task
finishNum += 1
logging.info('Manager statistics union [%s] -> %i/%i' % (unionId, finishNum, len(tasks)))
if finishNum < len(tasks): # some tasks are not yet completed
logging.debug('Manager union [%s] still working' % unionId)
return {
'finish': False,
'percent': '%f' % (round(finishNum / len(tasks), 2))
}
self.__unions.pop(unionId) # remove from union list
unionResult = [] # temporary storage
for taskId in tasks:
task = self.__tasks[taskId]
self.__tasks.pop(taskId) # remove from task list
unionResult.append(task['data'])
logging.info('Manager release union [%s] -> %s' % (unionId, unionResult))
return {
'finish': True,
'result': unionResult
}
def popTask(self) -> tuple[str or None, any]: # fetch a loaded task
for taskId, task in self.__tasks.items():
if task['status'] != self.__TASK_LOADED: continue # only get loaded task
task['status'] = self.__TASK_RUNNING # set task status as running
logging.info('Manager pop task [%s] -> %s' % (taskId, task['data']))
return taskId, copy.deepcopy(task['data'])
logging.debug('Manager has no more loaded tasks')
raise RuntimeError('No more tasks')
def finishTask(self, taskId: str, taskData: dict) -> None: # update task data when completed
if taskId not in self.__tasks:
logging.error('Manager task [%s] not found' % taskId)
raise RuntimeError('Task id not found')
self.__tasks[taskId]['data'] = copy.deepcopy(taskData)
self.__tasks[taskId]['status'] = self.__TASK_FINISH # set task status as completed
Manager = Task() # global task manager

7
Basis/Process.py

@ -153,8 +153,11 @@ class Process(object):
else: # discard all the output of sub process
stdout = DEVNULL
stderr = DEVNULL
self.__process = Popen(self.cmd, env = self.env, stdout = stdout,
stderr = stderr, preexec_fn = None if libcPath is None else Process.__preExec)
self.__process = Popen(
self.cmd, env = self.env,
stdout = stdout, stderr = stderr,
preexec_fn = None if libcPath is None else Process.__preExec
)
logging.info('[%s] Process running -> PID = %i' % (self.id, self.__process.pid))
def signal(self, signalNum: int) -> None: # send specified signal to sub process

42
Builder/__init__.py

@ -3,6 +3,10 @@
import os
import copy
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import WorkDir, PathEnv
from Basis.Functions import hostFormat, genFlag, getAvailablePort
from Builder import Brook
from Builder import VMess
@ -13,11 +17,6 @@ from Builder import Hysteria
from Builder import Shadowsocks
from Builder import ShadowsocksR
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Functions import genFlag, getAvailablePort
pathEnv = '/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin'
clientEntry = {
'ss': [Shadowsocks.load, '.json'],
'ssr': [ShadowsocksR.load, '.json'],
@ -38,9 +37,7 @@ class Builder(object):
proxyInfo: Proxy node information.
bind: Socks5 proxy bind address.
workDir: A directory for storing configuration files.
bindAddr: Socks5 proxy bind address.
taskId: Task ID, defaults to 12 random characters length.
@ -50,32 +47,32 @@ class Builder(object):
output = None
def __loadClient(self):
logging.info('[%s] Load %s proxy client at %s -> %s' % (self.id, self.proxyType, (
(('[%s]' if ':' in self.socksAddr else '%s') + ':%i') % (self.socksAddr, self.socksPort)
), str(self.proxyInfo)))
logging.info('[%s] Builder load %s proxy client at %s -> %s' % (
self.id, self.proxyType,
'socks5://%s:%i' % (hostFormat(self.socksAddr, v6Bracket = True), self.socksPort), self.proxyInfo
))
configFile = os.path.join( # config file path
self.__workDir, self.id + clientEntry[self.proxyType][1] # workDir + taskId + suffix
WorkDir, self.id + clientEntry[self.proxyType][1] # workDir + taskId + file suffix
)
logging.debug('[%s] Builder config file -> %s' % (self.id, configFile))
command, fileContent, envVar = clientEntry[self.proxyType][0](self.proxyInfo, { # load client boot info
'addr': self.socksAddr,
'port': self.socksPort,
}, configFile)
fileObject = { # add config file settings
envVar['PATH'] = PathEnv # add PATH env (some programs need it)
self.__process = Process(WorkDir, taskId = self.id, cmd = command, env = envVar, file = { # start process
'path': configFile,
'content': fileContent
}
envVar['PATH'] = pathEnv # add PATH env (some programs need it)
self.__process = Process(self.__workDir, taskId = self.id, cmd = command, env = envVar, file = fileObject)
})
def __init__(self, proxyType: str, proxyInfo: dict, taskId: str = '',
bind: str = '127.0.0.1', workDir: str = '/tmp/ProxyC') -> None: # init proxy client
def __init__(self, proxyType: str, proxyInfo: dict, bindAddr: str, taskId: str = '') -> None: # init proxy client
self.id = genFlag(length = 12) if taskId == '' else taskId # load task ID
if proxyType not in clientEntry:
logging.error('[%s] Builder receive unknown proxy type %s' % (self.id, proxyType))
raise RuntimeError('Unknown proxy type')
self.id = genFlag(length = 12) if taskId == '' else taskId # load task ID
self.__workDir = workDir
self.proxyType = proxyType # proxy type -> ss / ssr / vmess ...
self.proxyInfo = copy.copy(proxyInfo) # proxy object -> contain connection info
self.socksAddr = bind
self.proxyInfo = copy.copy(proxyInfo) # connection info
self.socksAddr = bindAddr
self.socksPort = getAvailablePort() # random port for socks5 exposed
self.__loadClient()
@ -83,5 +80,6 @@ class Builder(object):
return self.__process.status()
def destroy(self) -> None: # kill sub process and remove config file
logging.debug('[%s] Builder destroy' % self.id)
self.__process.quit()
self.output = self.__process.output

34
Checker/Http.py

@ -0,0 +1,34 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import requests
from Basis.Logger import logging
from Basis.Functions import hostFormat
def httpPing(taskId: str, url: str, socksAddr: str, socksPort: int, timeout: int) -> float:
try:
startTime = time.time_ns() # request start time
socksProxy = 'socks5://%s:%i' % (hostFormat(socksAddr, v6Bracket = True), socksPort)
logging.debug('[%s] Http ping -> request %s via %s' % (taskId, url, socksProxy))
httpRequest = requests.get(url, proxies = { # send http request by socks5 proxy
'http': socksProxy,
'https': socksProxy,
}, timeout = timeout)
except: # something error on request process (timeout or proxy not working)
logging.debug('[%s] Http ping -> request error' % taskId)
return -1
if httpRequest.status_code not in range(200, 300): # http code not 2xx
logging.debug('[%s] Http ping -> status code %i not expected' % (taskId, httpRequest.status_code))
return -1
delay = (time.time_ns() - startTime) / (10 ** 6) # ns -> ms
logging.debug('[%s] Http ping -> delay %f ms' % (taskId, delay))
return round(delay, 2) # two decimal places
def check(taskId: str, socksInfo: dict, options: dict) -> dict:
# TODO: multi check
return { # TODO: just demo
'delay': httpPing(taskId, options['url'], socksInfo['addr'], socksInfo['port'], options['timeout'])
}

18
Checker/__init__.py

@ -0,0 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from Checker import Http
def Checker(taskId: str, checkInfo: dict, socksInfo: dict) -> dict:
# TODO: ignore checkInfo for now
httpRet = Http.check(taskId, socksInfo, {
'url': 'http://baidu.com/',
'timeout': 20,
})
return {
'http': httpRet # TODO: just check http delay for now
}
# TODO: return check result

20
main.py

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import _thread
@ -36,24 +37,25 @@ def daemonDnsproxy(servers: list or None, port: int = 53, cache: int = 4194304)
time.sleep(2) # daemon time gap
if dnsproxy.poll() is not None: # unexpected exit
logging.warning('dnsproxy unexpected exit')
logging.debug('output of dnsproxy\n%s' % dnsproxy.stdout.read().decode('UTF-8'))
logging.debug('output of dnsproxy\n%s' % dnsproxy.stdout.read().decode('utf-8'))
dnsproxy = startDnsproxy(dnsCommand)
from Basis.Check import Check
from Basis.Manage import Manage
from Basis.Manager import Manager
def loopCheck() -> None:
while True:
time.sleep(2) # TODO: thread pool working
subTaskId, subTask = Manage.popSubTask()
if subTaskId is None: continue
logging.info('new sub task -> %s', subTask)
ret = Check(subTask['type'], subTask['info'], {})
try:
taskId, taskInfo = Manager.popTask()
except:
logging.debug('no more task')
continue
logging.info('new task %s -> %s' % (taskId, taskInfo))
ret = Check(taskId, taskInfo)
logging.info('check result -> %s' % ret)
subTask.pop('check')
subTask['result'] = ret
Manage.updateSubTask(subTaskId, subTask)
Manager.finishTask(taskId, ret)
logging.warning('ProxyC starts running (%s)' % Version)

56
manageDemo.py

@ -1,56 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from Basis.Manage import Manage
from Basis.Logger import logging
taskId_1 = Manage.addTask([
{'test': 1},
{'test': 2},
{'test': 3},
])
logging.critical('task id 1 -> %s' % taskId_1)
taskId_2 = Manage.addTask([
{'demo': 1},
{'demo': 2},
{'demo': 3},
])
logging.critical('task id 2 -> %s' % taskId_2)
logging.critical('list task -> %s' % Manage.listTask())
logging.critical('is task 1234 -> %s' % Manage.isTask('1234'))
logging.critical('is task %s -> %s' % (taskId_1, Manage.isTask(taskId_1)))
logging.critical('get task %s -> %s' % (taskId_1, Manage.getTask(taskId_1)))
subTaskId_1, subTask_1 = Manage.popSubTask()
logging.critical('pop sub task %s -> %s' % (subTaskId_1, subTask_1))
subTaskId_2, subTask_2 = Manage.popSubTask()
logging.critical('pop sub task %s -> %s' % (subTaskId_2, subTask_2))
subTaskId_3, subTask_3 = Manage.popSubTask()
logging.critical('pop sub task %s -> %s' % (subTaskId_3, subTask_3))
Manage.updateSubTask(subTaskId_3, {'test': 33})
logging.critical('update sub task %s' % subTaskId_3)
Manage.updateSubTask(subTaskId_2, {'test': 22})
logging.critical('update sub task %s' % subTaskId_2)
Manage.updateSubTask(subTaskId_1, {'test': 11})
logging.critical('update sub task %s' % subTaskId_1)
logging.critical('get task %s -> %s' % (taskId_1, Manage.getTask(taskId_1)))
subTaskId_4, subTask_4 = Manage.popSubTask()
logging.critical('pop sub task %s -> %s' % (subTaskId_4, subTask_4))
Manage.updateSubTask(subTaskId_4, {'demo': 2333})
logging.critical('update sub task %s' % subTaskId_4)
logging.critical('get task %s -> %s' % (taskId_2, Manage.getTask(taskId_2)))
logging.critical('sub task pop -> ' + str(Manage.popSubTask()))
logging.critical('sub task pop -> ' + str(Manage.popSubTask()))
logging.critical('sub task pop -> ' + str(Manage.popSubTask()))
Loading…
Cancel
Save