You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

99 lines
3.2 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import requests
from threading import Thread
from Tester import Shadowsocks
from Tester import ShadowsocksR
from Basis.Logger import logging
from Basis.Functions import ipFormat
from Basis.Functions import checkPortStatus
def waitForStart(port: int, times: int = 100, delay: int = 100) -> bool:
for i in range(times):
if not checkPortStatus(port): # port occupied
return True
time.sleep(delay / 1000) # default wait 100ms
return False # timeout
def test(testObj: dict) -> None:
logging.warning(testObj['title'])
testObj['client'].start()
testObj['server'].start()
if waitForStart(testObj['socks']['port']):
logging.debug('client start complete')
if waitForStart(testObj['interface']['port']):
logging.debug('server start complete')
logging.debug('start test process')
time.sleep(1)
errFlag = False
socks5 = '%s:%i' % (
ipFormat(testObj['socks']['addr'], v6Bracket = True),
testObj['socks']['port']
)
try:
request = requests.get(
'http://iserv.scutbot.cn',
proxies = {
'http': 'socks5://' + socks5,
'https': 'socks5://' + socks5,
},
timeout = 10
)
request.raise_for_status()
logging.info('socks5 %s -> ok' % socks5)
except Exception as exp:
logging.error('socks5 %s -> error' % socks5)
logging.error('requests exception\n' + str(exp))
errFlag = True
testObj['client'].quit()
testObj['server'].quit()
if errFlag:
logging.warning('client info')
logging.error('command -> %s' % testObj['client'].cmd)
logging.error('envVar -> %s' % testObj['client'].env)
logging.error('file -> %s' % testObj['client'].file)
logging.warning('client capture output')
logging.error('\n' + str(testObj['client'].output))
logging.warning('server info')
logging.error('command -> %s' % testObj['server'].cmd)
logging.error('envVar -> %s' % testObj['server'].env)
logging.error('file -> %s' % testObj['server'].file)
logging.warning('server capture output')
logging.error('\n' + str(testObj['server'].output))
def runTest(testIter: iter, threadNum: int):
threads = []
while True: # infinite loop
try:
for thread in threads:
if thread.is_alive(): continue
threads.remove(thread) # remove dead thread
if len(threads) < threadNum:
for i in range(threadNum - len(threads)): # start threads within limit
thread = Thread(target=test, args=(next(testIter),)) # create new thread
thread.start()
threads.append(thread) # record thread info
time.sleep(0.1)
except StopIteration: # traverse completed
break
for thread in threads: # wait until all threads exit
thread.join()
ss = Shadowsocks.load(isExtra = True)
# ss = Shadowsocks.load(isExtra = False)
ssr = ShadowsocksR.load()
logging.critical('test start')
runTest(ss, 64)
runTest(ssr, 64)
logging.critical('test complete')