Browse Source

feat: add ProxyChecker module

master
Dnomd343 3 years ago
parent
commit
81cfd13250
  1. 43
      ProxyChecker/Http.py
  2. 6
      ProxyChecker/__init__.py
  3. 28
      demo.py

43
ProxyChecker/Http.py

@ -0,0 +1,43 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import time
import requests
def httpPing(port, url = 'http://gstatic.com/generate_204', timeout = 30):
try:
startTime = time.time_ns()
socks5 = 'socks5://127.0.0.1:' + str(port)
httpRequest = requests.get(url, proxies = {
'http': socks5,
'https': socks5,
}, timeout = timeout)
if httpRequest.status_code == 204:
delay = (time.time_ns() - startTime) / (10 ** 6)
return round(delay, 2) # 保留小数点后两位
except: pass
return -1
def httpCheck(port, url = 'http://gstatic.com/generate_204', timeout = 30):
result = []
result.append(httpPing(port, url, timeout / 4))
result.append(httpPing(port, url, timeout / 2))
result.append(httpPing(port, url, timeout / 1))
failNum = 0
for ret in result:
if ret < 0:
failNum += 1
if failNum == 3: # 全部失败
return False, -1
elif failNum == 2: # 仅成功一次
for ret in result:
if ret > 0: # 返回成功单次延迟
return False, ret
elif failNum == 1: # 存在一次失败
sum = 0
for ret in result:
if ret > 0:
sum += ret
return False, sum / 2 # 返回成功延迟均值
else: # 全部成功
return True, min(min(result[0], result[1]), result[2]) # 返回最低延迟

6
ProxyChecker/__init__.py

@ -0,0 +1,6 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyChecker.Http import *
__all__ = [ 'httpPing', 'httpCheck' ]

28
demo.py

@ -1,21 +1,9 @@
import time import time
import socket import socket
import requests import requests
import ProxyBuilder as Builder
def checkSocksPort(port): import ProxyBuilder as Builder
try: import ProxyChecker as Checker
startTime = time.time_ns()
r = requests.get('http://gstatic.com/generate_204', proxies = {
'http': 'socks5://127.0.0.1:' + str(port),
'https': 'socks5://127.0.0.1:' + str(port),
})
if r.status_code == 204:
delay = (time.time_ns() - startTime) / (10 ** 6)
print(format(delay, '.2f') + 'ms')
return True
except: pass
return False
# testInfo = { # testInfo = {
# 'type': 'ss', # 'type': 'ss',
@ -42,6 +30,7 @@ testInfo = {
print("start") print("start")
print(dir(Builder)) print(dir(Builder))
print(dir(Checker))
print(testInfo) print(testInfo)
task = Builder.build(testInfo, '/tmp/ProxyC') task = Builder.build(testInfo, '/tmp/ProxyC')
@ -52,12 +41,11 @@ if Builder.check(task) == False:
Builder.destroy(task) Builder.destroy(task)
else: else:
print("test with gstatic") print("test with gstatic")
checkSocksPort(task['port']) health, delay = Checker.httpCheck(task['port'])
checkSocksPort(task['port']) print("health = " + str(health))
checkSocksPort(task['port']) if delay < 0:
if checkSocksPort(task['port']):
print("ok")
else:
print("error") print("error")
else:
print("delay = " + format(delay, '.2f') + 'ms')
Builder.destroy(task) Builder.destroy(task)
print("stop") print("stop")

Loading…
Cancel
Save