Browse Source

update: new check workflow

master
Dnomd343 3 years ago
parent
commit
357db78dd0
  1. 73
      Checker.py
  2. 2
      ProxyChecker/Http.py
  3. 80
      Run.py
  4. 12
      decode.py
  5. 28
      demo.py

73
check.py → Checker.py

@ -3,18 +3,11 @@
import os import os
import time import time
import json
import redis
import ProxyBuilder as Builder import ProxyBuilder as Builder
import ProxyChecker as Checker import ProxyChecker as Checker
workDir = '/tmp/ProxyC' def __loadDir(folderPath): # 创建文件夹
redisPrefix = 'proxyc-'
redisHost = 'localhost'
redisPort = 6379
def loadDir(folderPath): # 创建文件夹
try: try:
if os.path.exists(folderPath): # 文件 / 文件夹 存在 if os.path.exists(folderPath): # 文件 / 文件夹 存在
if not os.path.isdir(folderPath): # 文件 if not os.path.isdir(folderPath): # 文件
@ -25,9 +18,13 @@ def loadDir(folderPath): # 创建文件夹
except: except:
return False return False
def proxyHttpCheck(socksPort): # Http检测 def __proxyHttpCheck(socksPort, httpCheckUrl, httpCheckTimeout): # Http检测
try: try:
health, httpDelay = Checker.httpCheck(socksPort, timeout = 12) health, httpDelay = Checker.httpCheck(
socksPort,
url = httpCheckUrl,
timeout = httpCheckTimeout
)
if health == None: # 连接失败 if health == None: # 连接失败
return None return None
return { return {
@ -37,11 +34,17 @@ def proxyHttpCheck(socksPort): # Http检测
except: # 未知错误 except: # 未知错误
return None return None
def proxyTest(rawInfo, startDelay = 1): def proxyTest(
rawInfo,
startDelay = 1,
workDir = '/tmp/ProxyC',
httpCheckUrl = 'http://gstatic.com/generate_204',
httpCheckTimeout = 12,
):
''' '''
代理检测入口 代理检测入口
异常错误: 程序异常:
return None return None
启动失败: 启动失败:
@ -56,7 +59,7 @@ def proxyTest(rawInfo, startDelay = 1):
} }
''' '''
if loadDir(workDir) == False: # 工作文件夹无效 if __loadDir(workDir) == False: # 工作文件夹无效
return None return None
if not 'info' in rawInfo: # 缺少代理服务器信息 if not 'info' in rawInfo: # 缺少代理服务器信息
return None return None
@ -91,7 +94,7 @@ def proxyTest(rawInfo, startDelay = 1):
checkResult = {} checkResult = {}
for item in checkItem: for item in checkItem:
if item == 'http': # http检测 if item == 'http': # http检测
result = proxyHttpCheck(client['port']) result = __proxyHttpCheck(client['port'], httpCheckUrl, httpCheckTimeout)
else: # 未知检测项目 else: # 未知检测项目
result = None result = None
if result == None: # 检测出错 if result == None: # 检测出错
@ -104,45 +107,3 @@ def proxyTest(rawInfo, startDelay = 1):
'success': True, 'success': True,
'result': checkResult 'result': checkResult
} }
def getTask(): # 获取检测任务
redisObject = redis.StrictRedis(host = redisHost, port = redisPort, db = 0)
checkList = redisObject.keys(redisPrefix + 'check-a-*') # 优先级排序
if len(checkList) == 0:
checkList = redisObject.keys(redisPrefix + 'check-b-*')
if len(checkList) == 0:
checkList = redisObject.keys(redisPrefix + 'check-c-*')
if len(checkList) == 0:
checkList = redisObject.keys(redisPrefix + 'check-d-*')
if len(checkList) == 0:
checkList = redisObject.keys(redisPrefix + 'check-e-*')
if len(checkList) == 0: # 无任务
return None
key = checkList[0] # 选取首个任务
taskContent = redisObject.get(key)
redisObject.delete(key)
tag = str(key[len(redisPrefix) + 8:], encoding = "utf-8")
try:
return tag, json.loads(taskContent) # JSON解码
except: # JSON解码失败
return tag, None
def setResult(tag, result): # 写入检测结果
redisObject = redis.StrictRedis(host = redisHost, port = redisPort, db = 0)
key = redisPrefix + 'result-' + tag
redisObject.set(key, json.dumps(result))
def main():
try:
taskTag, task = getTask() # 获取检测任务
except:
return
if task == None:
return
try:
setResult(taskTag, proxyTest(task)) # 检测并写入数据库
except:
pass
main()

2
ProxyChecker/Http.py

@ -38,7 +38,7 @@ def httpPing(port, url = 'http://gstatic.com/generate_204', timeout = 30):
statusCode = httpRequest.status_code # 获取http状态码 statusCode = httpRequest.status_code # 获取http状态码
except: except:
return None, 'Http request error' return None, 'Http request error'
if 'statusCode' in vars() and statusCode == 204: # http测试成功 if 'statusCode' in vars() and statusCode >= 200 and statusCode < 300: # http测试成功
delay = (time.time_ns() - startTime) / (10 ** 6) delay = (time.time_ns() - startTime) / (10 ** 6)
return True, round(delay, 2) # 保留小数点后两位 return True, round(delay, 2) # 保留小数点后两位
else: else:

80
Run.py

@ -0,0 +1,80 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
import redis
import Checker
def __loadRedis(redisHost = 'localhost', redisPort = 6379): # 连接Redis数据库
return redis.StrictRedis(host = redisHost, port = redisPort, db = 0)
def __getCheckInfo(redisObject, redisPrefix):
'''
获取检测任务
无任务或发生异常:
return None, None
任务格式有误:
return tag, None
任务获取成功:
return tag, {...}
'''
try:
checkList = redisObject.keys(redisPrefix + 'check-a-*') # 优先级排序
if len(checkList) == 0:
checkList = redisObject.keys(redisPrefix + 'check-b-*')
if len(checkList) == 0:
checkList = redisObject.keys(redisPrefix + 'check-c-*')
if len(checkList) == 0:
checkList = redisObject.keys(redisPrefix + 'check-d-*')
if len(checkList) == 0:
checkList = redisObject.keys(redisPrefix + 'check-e-*')
if len(checkList) == 0: # 无任务
return None, None
key = checkList[0] # 选取首个任务
taskContent = redisObject.get(key) # 获取任务信息
redisObject.delete(key) # 删除任务记录
tag = str(key[len(redisPrefix) + 8:], encoding = "utf-8") # {prefix}check-x-{tag}
except:
return None, None
try:
return tag, json.loads(taskContent) # JSON解码
except: # JSON解码失败
return tag, None
def __setCheckResult(checkTag, checkResult, redisObject, redisPrefix): # 写入检测结果
try:
key = redisPrefix + 'result-' + checkTag
redisObject.set(key, json.dumps(checkResult))
return True
except:
return False
def main(startDelay, httpCheckUrl, httpCheckTimeout):
redisPrefix = 'proxyc-'
redisObject = __loadRedis()
checkTag, checkInfo = __getCheckInfo(redisObject, redisPrefix) # 获取检测任务
if checkTag == None:
print("no task found")
return
print(checkInfo)
checkResult = Checker.proxyTest(checkInfo)
if checkResult == None:
print("some bad things happen")
return
elif checkResult['success'] == False:
print("error proxy info")
return
print(checkResult)
if __setCheckResult(checkTag, checkResult, redisObject, redisPrefix) == False:
print("redis write error")
return
print("ok")
defaultStartDelay = 1
defaultHttpCheckTimeout = 30
defaultHttpCheckUrl = 'https://api.v2fly.org/checkConnection.svgz'
main(defaultStartDelay, defaultHttpCheckUrl, defaultHttpCheckTimeout)

12
decode.py

@ -1,12 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import ProxyDecoder as Decoder
ssPlainUrl = 'ss://bf-cfb:test@192.168.100.1:8888'
ssCommonUrl = 'ss://YmYtY2ZiOnRlc3RAMTkyLjE2OC4xMDAuMTo4ODg4#example-server'
ssSip002Url = 'ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example'
ssrCommonUrl = 'ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGsvP29iZnNwYXJhbT1ZMlUzTUdVeE5EY3dOekF1ZFhCa1lYUmxMbTFwWTNKdmMyOW1kQzVqYjIwJnByb3RvcGFyYW09TVRRM01EY3dPa0pGTTIxck9RJnJlbWFya3M9UlZoQlRWQk1SUSZncm91cD1kR1Z6ZEE'
print(Decoder.decode(ssrCommonUrl))

28
demo.py

@ -1,28 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import ProxyFilter as Filter
ssInfo = {
"type": "ss",
"server": "127.0.0.1 ",
"port": 12345,
"password": "dnomd343",
"method": "aes_256-ctr",
"plugin": "obfs",
"pluginParam": "obfs=http;host=www.bing.com"
}
ssrInfo = {
"type": "ssr",
"server": " 127.0.0.1",
"port": 23456,
"password": "dnomd343",
"method": "table",
"protocol": "auth-aes128_md5",
"protocolParam": "",
"obfs": "",
"obfsParam": "fafadfaf"
}
print(Filter.filter(ssrInfo))
Loading…
Cancel
Save