Browse Source

refactor: multiple refactorings

master
Dnomd343 3 years ago
parent
commit
f273f0d757
  1. 41
      Checker.py
  2. 7
      Loop.py
  3. 2
      ProxyFilter/Plugin.py
  4. 29
      Run.py
  5. 148
      Web.py

41
Checker.py

@ -7,7 +7,7 @@ import time
import ProxyBuilder as Builder import ProxyBuilder as Builder
import ProxyChecker as Checker import ProxyChecker as Checker
def __loadDir(folderPath): # 创建文件夹 def __loadDir(folderPath: str) -> bool: # 创建文件夹
try: try:
if os.path.exists(folderPath): # 文件 / 文件夹 存在 if os.path.exists(folderPath): # 文件 / 文件夹 存在
if not os.path.isdir(folderPath): # 文件 if not os.path.isdir(folderPath): # 文件
@ -18,14 +18,14 @@ def __loadDir(folderPath): # 创建文件夹
except: except:
return False return False
def __proxyHttpCheck(socksPort, httpCheckUrl, httpCheckTimeout): # Http检测 def __proxyHttpCheck(socksPort: int, httpCheckUrl: str, httpCheckTimeout: float) -> dict or None: # Http检测
try: try:
health, httpDelay = Checker.httpCheck( health, httpDelay = Checker.httpCheck(
socksPort, socksPort,
url = httpCheckUrl, url = httpCheckUrl,
timeout = httpCheckTimeout timeout = httpCheckTimeout
) )
if health == None: # 连接失败 if health is None: # 连接失败
return None return None
return { return {
'delay': httpDelay, 'delay': httpDelay,
@ -35,13 +35,12 @@ def __proxyHttpCheck(socksPort, httpCheckUrl, httpCheckTimeout): # Http检测
return None return None
def proxyTest( def proxyTest(
rawInfo, rawInfo: dict,
startDelay = 1, startDelay: float = 1,
workDir = '/tmp/ProxyC', workDir: str = '/tmp/ProxyC',
httpCheckUrl = 'http://gstatic.com/generate_204', httpCheckUrl: str = 'http://gstatic.com/generate_204',
httpCheckTimeout = 20, httpCheckTimeout: float = 20) -> dict or None:
): """
'''
代理检测入口 代理检测入口
程序异常: 程序异常:
@ -58,37 +57,38 @@ def proxyTest(
'result': checkResult 'result': checkResult
} }
''' """
if __loadDir(workDir) == False: # 工作文件夹无效 if not __loadDir(workDir): # 工作文件夹无效
return None return None
if not 'info' in rawInfo: # 缺少代理服务器信息 if 'info' not in rawInfo: # 缺少代理服务器信息
return None return None
client = None
try: try:
status, client = Builder.build(rawInfo['info'], workDir) status, client = Builder.build(rawInfo['info'], workDir)
except: # 构建发生未知错误 except: # 构建发生未知错误
Builder.destroy(client) Builder.destroy(client)
return None return None
if status == None: # 构建错误 if status is None: # 构建错误
Builder.destroy(client) Builder.destroy(client)
return None return None
elif status == False: # 节点信息有误 elif not status: # 节点信息有误
return { return {
'success': False 'success': False
} }
time.sleep(startDelay) # 延迟等待客户端启动 time.sleep(startDelay) # 延迟等待客户端启动
status = Builder.check(client) # 检查客户端状态 status = Builder.check(client) # 检查客户端状态
if status == None: # 检测失败 if status is None: # 检测失败
Builder.destroy(client) Builder.destroy(client)
return None return None
elif status == False: # 客户端异常退出 elif not status: # 客户端异常退出
Builder.destroy(client) Builder.destroy(client)
return { return {
'success': False 'success': False
} }
if not 'check' in rawInfo: # 缺少检测项目 if 'check' not in rawInfo: # 缺少检测项目
return None return None
checkItem = rawInfo['check'] checkItem = rawInfo['check']
checkResult = {} checkResult = {}
@ -97,7 +97,7 @@ def proxyTest(
result = __proxyHttpCheck(client['port'], httpCheckUrl, httpCheckTimeout) result = __proxyHttpCheck(client['port'], httpCheckUrl, httpCheckTimeout)
else: # 未知检测项目 else: # 未知检测项目
result = None result = None
if result == None: # 检测出错 if result is None: # 检测出错
Builder.destroy(client) Builder.destroy(client)
return None return None
checkResult[item] = result checkResult[item] = result
@ -105,5 +105,6 @@ def proxyTest(
Builder.destroy(client) # 销毁客户端 Builder.destroy(client) # 销毁客户端
return { return {
'success': True, 'success': True,
'result': checkResult 'check': checkResult,
'proxy': rawInfo['info']
} }

7
Loop.py

@ -17,9 +17,10 @@ redisObject = redis.StrictRedis(
port = redisPort port = redisPort
) )
runScript = '/usr/local/share/ProxyC/Run.py'
processList = [] processList = []
runScript = '/root/ProxyC/Run.py'
# runScript = '/usr/local/share/ProxyC/Run.py'
while True: while True:
spareNum = min( spareNum = min(
maxThread - len(processList), # 空余进程数 maxThread - len(processList), # 空余进程数
@ -34,7 +35,7 @@ while True:
time.sleep(0.2) time.sleep(0.2)
for process in processList: # 遍历子进程 for process in processList: # 遍历子进程
if process.poll() != None: # 进程已退出 if process.poll() is not None: # 进程已退出
processList.remove(process) processList.remove(process)
time.sleep(0.5) time.sleep(0.5)

2
ProxyFilter/Plugin.py

@ -112,7 +112,7 @@ pluginAlias = { # 插件别名
] ]
} }
def pluginFormat(plugin): # 插件格式化 def pluginFormat(plugin: str) -> str: # 插件格式化
plugin = plugin.replace('_', '-').lower().strip() plugin = plugin.replace('_', '-').lower().strip()
if plugin not in pluginList: # 非标插件名 if plugin not in pluginList: # 非标插件名
for pluginName in pluginAlias: for pluginName in pluginAlias:

29
Run.py

@ -5,11 +5,12 @@ import json
import redis import redis
import Checker import Checker
def __loadRedis(redisHost = 'localhost', redisPort = 6379): # 连接Redis数据库 redisPort = 6379
return redis.StrictRedis(host = redisHost, port = redisPort, db = 0) redisHost = 'localhost'
redisPrefix = 'proxyc-'
def __getCheckInfo(redisObject, redisPrefix): def __getCheckInfo() -> tuple[str or None, dict or None]:
''' """
获取检测任务 获取检测任务
无任务或发生异常: 无任务或发生异常:
@ -20,7 +21,7 @@ def __getCheckInfo(redisObject, redisPrefix):
任务获取成功: 任务获取成功:
return tag, {...} return tag, {...}
''' """
try: try:
checkList = redisObject.keys(redisPrefix + 'check-a-*') # 优先级排序 checkList = redisObject.keys(redisPrefix + 'check-a-*') # 优先级排序
if len(checkList) == 0: if len(checkList) == 0:
@ -44,7 +45,7 @@ def __getCheckInfo(redisObject, redisPrefix):
except: # JSON解码失败 except: # JSON解码失败
return tag, None return tag, None
def __setCheckResult(checkTag, checkResult, redisObject, redisPrefix): # 写入检测结果 def __setCheckResult(checkTag: str, checkResult: dict) -> bool: # 写入检测结果
try: try:
key = redisPrefix + 'result-' + checkTag key = redisPrefix + 'result-' + checkTag
redisObject.set(key, json.dumps(checkResult)) redisObject.set(key, json.dumps(checkResult))
@ -52,11 +53,9 @@ def __setCheckResult(checkTag, checkResult, redisObject, redisPrefix): # 写入
except: except:
return False return False
def main(startDelay, httpCheckUrl, httpCheckTimeout): def main(startDelay: float, httpCheckUrl: str, httpCheckTimeout: int) -> None:
redisPrefix = 'proxyc-' checkTag, checkInfo = __getCheckInfo() # 获取检测任务
redisObject = __loadRedis() if checkTag is None:
checkTag, checkInfo = __getCheckInfo(redisObject, redisPrefix) # 获取检测任务
if checkTag == None:
print("no task found") print("no task found")
return return
print(checkInfo) print(checkInfo)
@ -66,15 +65,15 @@ def main(startDelay, httpCheckUrl, httpCheckTimeout):
httpCheckUrl = httpCheckUrl, httpCheckUrl = httpCheckUrl,
httpCheckTimeout = httpCheckTimeout httpCheckTimeout = httpCheckTimeout
) )
if checkResult == None: if checkResult is None:
print("some bad things happen") print("some bad things happen")
return return
elif checkResult['success'] == False: elif not checkResult['success']:
print("error proxy info") print("error proxy info")
return return
print(checkTag + ' -> ', end = '') print(checkTag + ' -> ', end = '')
print(checkResult) print(checkResult)
if __setCheckResult(checkTag, checkResult, redisObject, redisPrefix) == False: if not __setCheckResult(checkTag, checkResult):
print("redis write error") print("redis write error")
return return
@ -82,4 +81,6 @@ defaultStartDelay = 1.5
defaultHttpCheckTimeout = 20 defaultHttpCheckTimeout = 20
defaultHttpCheckUrl = 'http://gstatic.com/generate_204' defaultHttpCheckUrl = 'http://gstatic.com/generate_204'
redisObject = redis.StrictRedis(host = redisHost, port = redisPort, db = 0) # 连接Redis数据库
main(defaultStartDelay, defaultHttpCheckUrl, defaultHttpCheckTimeout) main(defaultStartDelay, defaultHttpCheckUrl, defaultHttpCheckTimeout)

148
Web.py

@ -15,19 +15,19 @@ redisPrefix = 'proxyc-'
accessToken = 'dnomd343' accessToken = 'dnomd343'
def httpPostArg(field: str): # 获取HTTP POST参数 def httpPostArg(field: str) -> dict or str or None: # 获取HTTP POST参数
try: try:
if request.values.get(field) != None: # application/x-www-form-urlencoded if request.values.get(field) is not None: # application/x-www-form-urlencoded
return request.values.get(field) return request.values.get(field)
elif request.json.get(field) != None: # application/json elif request.json.get(field) is not None: # application/json
return request.json.get(field) return request.json.get(field)
elif request.form.get(field) != None: # multipart/form-data elif request.form.get(field) is not None: # multipart/form-data
return request.form.get(field) return request.form.get(field)
except: except:
pass pass
return None return None
def genRandomId(length: int = 24): # 生成随机ID def genRandomId(length: int = 24) -> str: # 生成随机ID
tag = '' tag = ''
for i in range(0, length): for i in range(0, length):
tmp = random.randint(0, 15) tmp = random.randint(0, 15)
@ -37,17 +37,17 @@ def genRandomId(length: int = 24): # 生成随机ID
tag += str(tmp) # 0 ~ 9 tag += str(tmp) # 0 ~ 9
return tag return tag
def genError(message: str): # 生成错误回复 def genError(message: str) -> dict: # 生成错误回复
return { return {
'success': False, 'success': False,
'message': message 'message': message
} }
def genSuccess(data: dict): # 生成成功返回 def genSuccess(data: dict) -> dict: # 生成成功返回
data['success'] = True data['success'] = True
return data return data
def getCheckList(userId: str): # 获取检测任务列表 def getCheckList(userId: str) -> list or None: # 获取检测任务列表
try: try:
taskList = [] taskList = []
rawTaskList = redisObject.keys(redisPrefix + 'task-' + userId + '*') rawTaskList = redisObject.keys(redisPrefix + 'task-' + userId + '*')
@ -57,22 +57,22 @@ def getCheckList(userId: str): # 获取检测任务列表
except: except:
return None return None
def addCheckTask(checkList, proxyList, priority: str, userId: str): # 新增检测任务 def addCheckTask(checkList: dict, proxyList: dict, priority: str, userId: str) -> dict: # 新增检测任务
try: try:
import ProxyDecoder as Decoder import ProxyDecoder as Decoder
import ProxyFilter as Filter import ProxyFilter as Filter
checkList = list(set(checkList)) # 检测方式去重 checkList = list(set(checkList)) # 检测方式去重
for checkMethod in checkList: for checkMethod in checkList:
if not checkMethod in ['http']: if checkMethod not in ['http']:
return genError('unknown check method `' + checkMethod + '`') return genError('unknown check method `' + checkMethod + '`')
for i in range(0, len(proxyList)): for i in range(0, len(proxyList)):
proxyList[i] = Decoder.decode(proxyList[i]) # 解码分享链接 proxyList[i] = Decoder.decode(proxyList[i]) # 解码分享链接
if proxyList[i] == None: if proxyList[i] is None:
return genError('could not decode index ' + str(i)) return genError('could not decode index ' + str(i))
status, proxyList[i] = Filter.filter(proxyList[i]) # 节点信息检查 status, proxyList[i] = Filter.filte(proxyList[i]) # 节点信息检查
if status == False: # 节点不合法 if not status: # 节点不合法
return genError('index ' + str(i) + ': ' + proxyList[i]) return genError('index ' + str(i) + ': ' + proxyList[i])
tagList = [] tagList = []
@ -105,16 +105,16 @@ def addCheckTask(checkList, proxyList, priority: str, userId: str): # 新增检
except: except:
return genError('server error') return genError('server error')
def getTaskInfo(checkId): # 获取任务详情 def getTaskInfo(checkId: str) -> dict: # 获取任务详情
try: try:
taskKey = redisObject.keys(redisPrefix + 'task-' + checkId) taskKey = redisObject.keys(redisPrefix + 'task-' + checkId)
if taskKey == []: # 任务ID不存在 if not taskKey: # 任务ID不存在
return genError('invalid check id') return genError('invalid check id')
taskKey = str(taskKey[0], encoding = 'utf-8') taskKey = str(taskKey[0], encoding = 'utf-8')
taskInfo = json.loads( taskInfo = json.loads(
redisObject.get(taskKey) redisObject.get(taskKey)
) )
if taskInfo['complete'] == True: # 任务已完成 if taskInfo['complete']: # 任务已完成
return { return {
'success': True, 'success': True,
'complete': True, 'complete': True,
@ -125,7 +125,7 @@ def getTaskInfo(checkId): # 获取任务详情
completeNum = 0 # 测试完成数目 completeNum = 0 # 测试完成数目
for tag in taskInfo['proxy']: for tag in taskInfo['proxy']:
if redisObject.keys(redisPrefix + 'result-' + tag) != []: # 暂未测试 if redisObject.keys(redisPrefix + 'result-' + tag): # 暂未测试
completeNum += 1 completeNum += 1
if completeNum < len(taskInfo['proxy']): # 测试未完成 if completeNum < len(taskInfo['proxy']): # 测试未完成
return { return {
@ -157,16 +157,16 @@ def getTaskInfo(checkId): # 获取任务详情
except: except:
return genError('server error') return genError('server error')
def deleteTask(checkId): # 删除任务 def deleteTask(checkId: str) -> dict: # 删除任务
try: try:
taskKey = redisObject.keys(redisPrefix + 'task-' + checkId) taskKey = redisObject.keys(redisPrefix + 'task-' + checkId)
if taskKey == []: # 任务ID不存在 if not taskKey: # 任务ID不存在
return genError('invalid check id') return genError('invalid check id')
taskKey = str(taskKey[0], encoding = 'utf-8') taskKey = str(taskKey[0], encoding = 'utf-8')
taskInfo = json.loads( taskInfo = json.loads(
redisObject.get(taskKey) redisObject.get(taskKey)
) )
if taskInfo['complete'] != True: # 任务未完成 if not taskInfo['complete']: # 任务未完成
return genError('task not complete') return genError('task not complete')
redisObject.delete(taskKey) redisObject.delete(taskKey)
return { return {
@ -177,7 +177,7 @@ def deleteTask(checkId): # 删除任务
return genError('server error') return genError('server error')
def isAdminToken(token: str) -> bool: def isAdminToken(token: str) -> bool:
''' """
是否为管理员token 是否为管理员token
验证成功: 验证成功:
@ -185,12 +185,12 @@ def isAdminToken(token: str) -> bool:
验证失败: 验证失败:
return False return False
''' """
adminToken = accessToken adminToken = accessToken
return (token == adminToken) return token == adminToken
def isUserToken(token: str) -> bool: def isUserToken(token: str) -> bool:
''' """
是否为有效token 是否为有效token
token有效: token有效:
@ -198,7 +198,7 @@ def isUserToken(token: str) -> bool:
token无效: token无效:
return False return False
''' """
try: try:
if token.encode('utf-8') in redisObject.smembers(redisPrefix + 'users'): if token.encode('utf-8') in redisObject.smembers(redisPrefix + 'users'):
return True return True
@ -206,19 +206,19 @@ def isUserToken(token: str) -> bool:
pass pass
return False return False
def addUser(priority: str, remain): def addUser(priority: str, remain: int or str) -> tuple[bool, str]:
''' """
添加账号 添加账号
添加异常:
return False, {reason}
添加成功: 添加成功:
return True, userId return True, userId
'''
添加异常:
return False, {reason}
"""
try: try:
userId = genRandomId(length = 24) userId = genRandomId(length = 24)
if not priority in ['a','b','c','d','e']: # 优先级无效 if priority not in ['a', 'b', 'c', 'd', 'e']: # 优先级无效
return False, 'invalid priority' return False, 'invalid priority'
remain = int(remain) remain = int(remain)
if remain < 0: if remain < 0:
@ -239,8 +239,8 @@ def addUser(priority: str, remain):
except: except:
return False, 'server error' return False, 'server error'
def delUser(userId: str): def delUser(userId: str) -> tuple[bool, str]:
''' """
删除账号 删除账号
删除成功: 删除成功:
@ -248,13 +248,13 @@ def delUser(userId: str):
删除失败: 删除失败:
return False, {reason} return False, {reason}
''' """
try: try:
if isUserToken(userId) == False: if not isUserToken(userId):
return False, 'invalid user id' return False, 'invalid user id'
taskList = redisObject.keys(redisPrefix + 'task-' + userId + '*') taskList = redisObject.keys(redisPrefix + 'task-' + userId + '*')
if taskList != []: if taskList:
return False, 'task list not empty' return False, 'task list not empty'
redisObject.srem(redisPrefix + 'users', userId) redisObject.srem(redisPrefix + 'users', userId)
@ -263,8 +263,8 @@ def delUser(userId: str):
except: except:
return False, 'server error' return False, 'server error'
def getUserInfo(userId: str, minus: bool = False): def getUserInfo(userId: str, minus: bool = False) -> dict or None:
''' """
获取账号信息 (minus = True: 剩余次数 - 1) 获取账号信息 (minus = True: 剩余次数 - 1)
获取异常: 获取异常:
@ -276,14 +276,14 @@ def getUserInfo(userId: str, minus: bool = False):
'priority': '...', 'priority': '...',
'remain': ... 'remain': ...
} }
''' """
try: try:
if isUserToken(userId) == False: # userId不存在 if not isUserToken(userId): # userId不存在
return None return None
userInfo = json.loads( userInfo = json.loads(
redisObject.get(redisPrefix + 'user-' + userId) # 账号信息 redisObject.get(redisPrefix + 'user-' + userId) # 账号信息
) )
if minus == True and userInfo['remain'] > 0: if minus and userInfo['remain'] > 0:
userInfo['remain'] -= 1 # 剩余次数 - 1 userInfo['remain'] -= 1 # 剩余次数 - 1
redisObject.set( redisObject.set(
redisPrefix + 'user-' + userId, # 记入数据库 redisPrefix + 'user-' + userId, # 记入数据库
@ -293,8 +293,8 @@ def getUserInfo(userId: str, minus: bool = False):
except: except:
return None # 异常 return None # 异常
def getUserList(): def getUserList() -> dict or None:
''' """
获取所有账号信息 获取所有账号信息
获取异常: 获取异常:
@ -312,7 +312,7 @@ def getUserList():
} }
... ...
} }
''' """
try: try:
userList = {} userList = {}
for userId in redisObject.smembers(redisPrefix + 'users'): # 遍历全部账号 for userId in redisObject.smembers(redisPrefix + 'users'): # 遍历全部账号
@ -324,8 +324,8 @@ def getUserList():
except: except:
return None return None
def modifyUserInfo(userId: str, priority: str = None, remain = None): def modifyUserInfo(userId: str, priority: str = None, remain = None) -> bool:
''' """
修改账号信息 修改账号信息
修改成功: 修改成功:
@ -333,17 +333,17 @@ def modifyUserInfo(userId: str, priority: str = None, remain = None):
修改失败: 修改失败:
return False return False
''' """
try: try:
userInfo = getUserInfo(userId) userInfo = getUserInfo(userId)
if userInfo == None: # 账号不存在 if userInfo is None: # 账号不存在
return False return False
if priority != None: # 优先级变动 if priority is not None: # 优先级变动
if not priority in ['a','b','c','d','e']: # 优先级无效 if priority not in ['a', 'b', 'c', 'd', 'e']: # 优先级无效
return False return False
userInfo['priority'] = priority userInfo['priority'] = priority
if remain != None: # 剩余次数变动 if remain is not None: # 剩余次数变动
remain = int(remain) remain = int(remain)
if remain < 0: if remain < 0:
remain = -1 # 不限次数 remain = -1 # 不限次数
@ -357,91 +357,91 @@ def modifyUserInfo(userId: str, priority: str = None, remain = None):
return False return False
@api.route(apiPath + '/user', methods = ['GET', 'POST']) @api.route(apiPath + '/user', methods = ['GET', 'POST'])
def apiUser(): def apiUser() -> dict:
if request.method == 'GET': # 获取账号列表 if request.method == 'GET': # 获取账号列表
if isAdminToken(request.args.get('token')) == False: # 非管理员token if not isAdminToken(request.args.get('token')): # 非管理员token
return genError('invalid admin token') return genError('invalid admin token')
userList = getUserList() userList = getUserList()
if userList == None: # 获取失败 if userList is None: # 获取失败
return genError('server error') return genError('server error')
return genSuccess({ return genSuccess({
'user': userList 'user': userList
}) })
elif request.method == 'POST': # 添加账号 elif request.method == 'POST': # 添加账号
if isAdminToken(httpPostArg('token')) == False: # 非管理员token if not isAdminToken(httpPostArg('token')): # 非管理员token
return genError('invalid admin token') return genError('invalid admin token')
priority = httpPostArg('priority') priority = httpPostArg('priority')
if priority == None: if priority is None:
priority = 'c' # 默认优先级 priority = 'c' # 默认优先级
remain = httpPostArg('remain') remain = httpPostArg('remain')
if remain == None: if remain is None:
remain = '-1' # 默认剩余次数 remain = '-1' # 默认剩余次数
status, userId = addUser(priority, remain) # 创建新账号 status, userId = addUser(priority, remain) # 创建新账号
if status == False: if not status:
return genError(userId) # 创建错误 return genError(userId) # 创建错误
return genSuccess({ return genSuccess({
'userId': userId # 创建成功 'userId': userId # 创建成功
}) })
@api.route(apiPath + '/user/<userId>', methods = ['GET', 'PUT', 'PATCH', 'DELETE']) @api.route(apiPath + '/user/<userId>', methods = ['GET', 'PUT', 'PATCH', 'DELETE'])
def apiUserId(userId): def apiUserId(userId: str) -> dict:
if request.method == 'GET': # 获取账号信息 if request.method == 'GET': # 获取账号信息
userInfo = getUserInfo(userId) userInfo = getUserInfo(userId)
if userInfo == None: if userInfo is None:
return genError('invalid user id') return genError('invalid user id')
return genSuccess(userInfo) return genSuccess(userInfo)
elif request.method == 'PUT' or request.method == 'PATCH': # 更新账号信息 elif request.method == 'PUT' or request.method == 'PATCH': # 更新账号信息
if isAdminToken(httpPostArg('token')) == False: # 非管理员token if not isAdminToken(httpPostArg('token')): # 非管理员token
return genError('invalid admin token') return genError('invalid admin token')
priority = httpPostArg('priority') priority = httpPostArg('priority')
remain = httpPostArg('remain') remain = httpPostArg('remain')
if request.method == 'PUT': if request.method == 'PUT':
if priority == None or remain == None: # 参数不全 if priority is None or remain is None: # 参数不全
return genError('missing parameter') return genError('missing parameter')
if modifyUserInfo(userId, priority = priority, remain = remain) == False: # 更新账号信息 if not modifyUserInfo(userId, priority = priority, remain = remain): # 更新账号信息
return genError('server error') return genError('server error')
return genSuccess( return genSuccess(
getUserInfo(userId) # 更新成功 getUserInfo(userId) # 更新成功
) )
elif request.method == 'DELETE': # 销毁账号 elif request.method == 'DELETE': # 销毁账号
if isAdminToken(httpPostArg('token')) == False: # 非管理员token if not isAdminToken(httpPostArg('token')): # 非管理员token
return genError('invalid admin token') return genError('invalid admin token')
status, reason = delUser(userId) status, reason = delUser(userId)
if status == False: if not status:
return genError(reason) return genError(reason)
return genSuccess({ return genSuccess({
'userId': userId # 删除成功 'userId': userId # 删除成功
}) })
@api.route(apiPath + '/check', methods = ['GET', 'POST']) @api.route(apiPath + '/check', methods = ['GET', 'POST'])
def apiCheck(): def apiCheck() -> dict:
if request.method == 'GET': # 获取检测任务列表 if request.method == 'GET': # 获取检测任务列表
token = request.args.get('token') token = request.args.get('token')
if isUserToken(token) == False: if not isUserToken(token):
return genError('invalid user token') return genError('invalid user token')
taskList = getCheckList(token) taskList = getCheckList(token)
if taskList == None: if taskList is None:
return genError('server error') return genError('server error')
return genSuccess({ return genSuccess({
'taskList': taskList 'taskList': taskList
}) })
elif request.method == 'POST': # 添加检测任务 elif request.method == 'POST': # 添加检测任务
token = httpPostArg('token') token = httpPostArg('token')
if isUserToken(token) == False: if not isUserToken(token):
return genError('invalid user token') return genError('invalid user token')
checkList = httpPostArg('check') # 检测列表 checkList = httpPostArg('check') # 检测列表
if checkList == None: if checkList is None:
return genError('missing check list') return genError('missing check list')
proxyList = httpPostArg('proxy') # 代理列表 proxyList = httpPostArg('proxy') # 代理列表
if proxyList == None: if proxyList is None:
return genError('missing proxy list') return genError('missing proxy list')
priority = getUserInfo(token, minus = True)['priority'] # 获取账号优先级 priority = getUserInfo(token, minus = True)['priority'] # 获取账号优先级
if priority == None: if priority is None:
return genError('server error') return genError('server error')
return addCheckTask(checkList, proxyList, priority, token) return addCheckTask(checkList, proxyList, priority, token)
@api.route(apiPath + '/check/<checkId>', methods = ['GET', 'DELETE']) @api.route(apiPath + '/check/<checkId>', methods = ['GET', 'DELETE'])
def apiCheckId(checkId): def apiCheckId(checkId: str) -> dict:
if request.method == 'GET': # 获取检测任务状态 if request.method == 'GET': # 获取检测任务状态
return getTaskInfo(checkId) return getTaskInfo(checkId)
elif request.method == 'DELETE': # 删除检测任务 elif request.method == 'DELETE': # 删除检测任务

Loading…
Cancel
Save