You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

391 lines
12 KiB

#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
import redis
import random
from flask import Flask, request
apiPath = '/'
api = Flask(__name__)
redisPort = 6379
redisHost = 'localhost'
redisPrefix = 'proxyc-'
accessToken = 'dnomd343'
def httpPostArg(field: str): # 获取HTTP POST参数
try:
if request.values.get(field) != None: # application/x-www-form-urlencoded
return request.values.get(field)
elif request.json.get(field) != None: # application/json
return request.json.get(field)
elif request.form.get(field) != None: # multipart/form-data
return request.form.get(field)
except:
pass
return None
def genRandomId(length = 24): # 生成随机ID
tag = ''
for i in range(0, length):
tmp = random.randint(0, 15)
if tmp >= 10:
tag += chr(tmp + 87) # a ~ f
else:
tag += str(tmp) # 0 ~ 9
return tag
def genError(message: str): # 生成错误回复
return {
'success': False,
'message': message
}
def genSuccess(data: dict): # 生成成功返回
data['success'] = True
return data
def addCheckTask(priority, checkList, proxyList): # 检测任务加入数据库
try:
tagList = []
for proxyInfo in proxyList:
tag = genRandomId(32) # 32位检测ID
tagList.append(tag)
redisObject.set( # 写入数据库
redisPrefix + 'check-' + priority + '-' + tag,
json.dumps({
'tag': tag,
'check': checkList,
'info': proxyInfo
})
)
checkId = genRandomId(24) # 24位任务ID
redisObject.set( # 记录任务
redisPrefix + 'task-' + checkId,
json.dumps({
'checkId': checkId,
'priority': priority,
'check': checkList,
'proxy': tagList,
'complete': False
})
)
return checkId
except: # 异常错误
return None
def getCheckList(): # 获取检测任务列表
try:
if request.args.get('token') != accessToken: # token无效
return genError('invalid token')
taskList = []
rawTaskList = redisObject.keys(redisPrefix + 'task-*')
for task in rawTaskList: # 获取任务ID
taskList.append(str(task[len(redisPrefix) + 5:], encoding = 'utf-8'))
return {
'success': True,
'taskList': taskList
}
except:
return genError('server error')
def newCheckTask(): # 新增检测任务
try:
import ProxyDecoder as Decoder
import ProxyFilter as Filter
if httpPostArg('token') != accessToken: # token无效
return genError('invalid token')
priority = httpPostArg('priority') # 优先级选项
if not priority in ['a','b','c','d','e']:
priority = 'c' # 默认优先级
checkList = httpPostArg('check') # 检测列表
if checkList == None:
return genError('missing check list')
for checkMethod in checkList:
if not checkMethod in ['http']:
return genError('unknown check method `' + checkMethod + '`')
proxyList = httpPostArg('proxy') # 代理列表
if proxyList == None:
return genError('missing proxy list')
if isinstance(proxyList, str): # 单项任务
proxyList = [ proxyList ]
for i in range(0, len(proxyList)):
if isinstance(proxyList[i], str): # 解码分享链接
proxyList[i] = Decoder.decode(proxyList[i])
if proxyList[i] == None:
return genError('could not decode index ' + str(i))
status, proxyList[i] = Filter.filter(proxyList[i]) # 节点信息检查
if status == False: # 节点不合法
return genError('index ' + str(i) + ': ' + proxyList[i])
checkId = addCheckTask(priority, checkList, proxyList) # 任务加入数据库
if checkId == None: # 异常错误
return genError('server error')
return {
'success': True,
'checkId': checkId
}
except:
return genError('server error')
def getTaskInfo(checkId): # 获取任务详情
try:
taskKey = redisObject.keys(redisPrefix + 'task-' + checkId)
if taskKey == []: # 任务ID不存在
return genError('invalid check id')
taskKey = str(taskKey[0], encoding = 'utf-8')
taskInfo = json.loads(
redisObject.get(taskKey)
)
if taskInfo['complete'] == True: # 任务已完成
return {
'success': True,
'complete': True,
'checkId': checkId,
'number': len(taskInfo['proxy']),
'result': taskInfo['result']
}
completeNum = 0 # 测试完成数目
for tag in taskInfo['proxy']:
if redisObject.keys(redisPrefix + 'result-' + tag) != []: # 暂未测试
completeNum += 1
if completeNum < len(taskInfo['proxy']): # 测试未完成
return {
'success': True,
'complete': False,
'checkId': checkId,
'number': len(taskInfo['proxy']),
'schedule': round(completeNum / len(taskInfo['proxy']), 2)
}
checkResult = []
for tag in taskInfo['proxy']: # 遍历检测结果
checkResult.append(
json.loads(
redisObject.get(redisPrefix + 'result-' + tag)
)
)
redisObject.delete(redisPrefix + 'result-' + tag) # 删除测试结果
taskInfo['complete'] = True
taskInfo['result'] = checkResult
redisObject.set(taskKey, json.dumps(taskInfo)) # 记入数据库
return {
'success': True,
'complete': True,
'checkId': checkId,
'number': len(taskInfo['proxy']),
'result': taskInfo['result']
}
except:
return genError('server error')
def deleteTask(checkId): # 删除任务
try:
taskKey = redisObject.keys(redisPrefix + 'task-' + checkId)
if taskKey == []: # 任务ID不存在
return genError('invalid check id')
taskKey = str(taskKey[0], encoding = 'utf-8')
taskInfo = json.loads(
redisObject.get(taskKey)
)
if taskInfo['complete'] != True: # 任务未完成
return genError('task not complete')
redisObject.delete(taskKey)
return {
'success': True,
'checkId': checkId
}
except:
return genError('server error')
def isAdminToken(token: str) -> bool:
'''
是否为管理员token
验证成功:
return True
验证失败:
return False
'''
adminToken = accessToken
return (token == adminToken)
def isUserToken(token: str) -> bool:
'''
是否为有效token
token有效:
return True
token无效:
return False
'''
try:
if token.encode('utf-8') in redisObject.smembers(redisPrefix + 'users'):
return True
except:
return False
def addUser(priority: str, remain: int):
'''
添加账号
添加异常:
return False, {reason}
添加成功:
return True, userId
'''
try:
userId = genRandomId(length = 24)
if not priority in ['a','b','c','d','e']: # 优先级无效
return False, 'invalid priority'
if remain < 0:
remain = -1 # 不限次数
userInfo = {
'token': userId,
'priority': priority,
'remain': remain
}
redisObject.sadd( # 添加账号token
redisPrefix + 'users', userId
)
redisObject.set( # 记录账号信息
redisPrefix + 'user-' + userId,
json.dumps(userInfo)
)
return True, userId # 返回userId
except:
return False, 'server error'
def getUserInfo(userId: str, minus: bool = False):
'''
获取账号信息 (minus = True: 剩余次数 - 1)
获取异常:
return None
获取成功:
return {
'token': '...'
'priority': '...',
'remain': ...
}
'''
try:
if isUserToken(userId) == False: # userId不存在
return None
userInfo = json.loads(
redisObject.get(redisPrefix + 'user-' + userId) # 账号信息
)
if minus == True and userInfo['remain'] > 0:
userInfo['remain'] -= 1 # 剩余次数 - 1
redisObject.set(
redisPrefix + 'user-' + token, # 记入数据库
json.dumps(userInfo)
)
return userInfo
except:
return None # 异常
def getUserList():
'''
获取所有账号信息
获取异常:
return None
获取成功:
return {
'token_1': {
'priority': '...',
'working': ...,
'remain': ...,
},
'token_2': {
...
}
...
}
'''
try:
userList = {}
for userId in redisObject.smembers(redisPrefix + 'users'): # 遍历全部账号
userId = str(userId, encoding = 'utf-8')
userInfo = getUserInfo(userId)
userInfo.pop('token')
userList[userId] = userInfo # 记录账号信息
return userList
except:
return None
@api.route(apiPath + '/user', methods = ['GET','POST'])
def apiUser():
if request.method == 'GET': # 获取账号列表
if isAdminToken(request.args.get('token')) == False: # 非管理员token
return genError('invalid admin token')
userList = getUserList()
if userList == None: # 获取失败
return genError('server error')
return genSuccess({
'user': userList
})
elif request.method == 'POST': # 添加账号
if isAdminToken(httpPostArg('token')) == False: # 非管理员token
return genError('invalid admin token')
priority = httpPostArg('priority')
if priority == None:
priority = 'c' # 默认优先级
remain = httpPostArg('remain')
if remain == None:
remain = -1 # 默认剩余次数
status, userId = addUser(priority, remain) # 创建新账号
if status == False:
return genError(userId) # 创建错误
return genSuccess({
'userId': userId # 创建成功
})
@api.route(apiPath + '/user/<userId>', methods = ['GET','PUT','PATCH','DELETE'])
def apiUserId(userId):
if request.method == 'GET': # 获取账号信息
userInfo = getUserInfo(userId)
if userInfo == None:
return genError('invalid user id')
return genSuccess(userInfo)
elif request.method == 'PUT': # 更新账号信息
pass
elif request.method == 'PATCH': # 修改账号信息
pass
elif request.method == 'DELETE': # 销毁账号
pass
return genError('wait for develop')
@api.route(apiPath + '/check', methods = ['GET','POST'])
def apiCheck():
if request.method == 'GET': # 获取检测任务列表
return getCheckList()
elif request.method == 'POST': # 添加检测任务
return newCheckTask()
@api.route(apiPath + '/check/<checkId>', methods = ['GET','DELETE'])
def apiCheckId(checkId):
if request.method == 'GET': # 获取检测任务状态
return getTaskInfo(checkId)
elif request.method == 'DELETE': # 删除检测任务
return deleteTask(checkId)
redisObject = redis.StrictRedis(
db = 0,
host = redisHost,
port = redisPort
)
api.run(host = '0.0.0.0', port = 43581, debug = True)