#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time import requests from threading import Thread from Tester import VMess from Tester import Shadowsocks from Tester import ShadowsocksR from Basis.Logger import logging from Basis.Functions import ipFormat from Basis.Functions import checkPortStatus def waitForStart(port: int, times: int = 100, delay: int = 100) -> bool: for i in range(times): if not checkPortStatus(port): # port occupied return True time.sleep(delay / 1000) # default wait 100ms return False # timeout def test(testObj: dict) -> None: logging.warning(testObj['title']) testObj['client'].start() testObj['server'].start() if waitForStart(testObj['socks']['port']): logging.debug('client start complete') if waitForStart(testObj['interface']['port']): logging.debug('server start complete') logging.debug('start test process') time.sleep(1) errFlag = False socks5 = '%s:%i' % ( ipFormat(testObj['socks']['addr'], v6Bracket = True), testObj['socks']['port'] ) try: request = requests.get( 'http://iserv.scutbot.cn', proxies = { 'http': 'socks5://' + socks5, 'https': 'socks5://' + socks5, }, timeout = 10 ) request.raise_for_status() logging.info('socks5 %s -> ok' % socks5) except Exception as exp: logging.error('socks5 %s -> error' % socks5) logging.error('requests exception\n' + str(exp)) errFlag = True testObj['client'].quit() testObj['server'].quit() if errFlag: logging.warning('client info') logging.error('command -> %s' % testObj['client'].cmd) logging.error('envVar -> %s' % testObj['client'].env) logging.error('file -> %s' % testObj['client'].file) logging.warning('client capture output') logging.error('\n' + str(testObj['client'].output)) logging.warning('server info') logging.error('command -> %s' % testObj['server'].cmd) logging.error('envVar -> %s' % testObj['server'].env) logging.error('file -> %s' % testObj['server'].file) logging.warning('server capture output') logging.error('\n' + str(testObj['server'].output)) def runTest(testIter: iter, threadNum: int): threads = [] while True: # infinite loop try: for thread in threads: if thread.is_alive(): continue threads.remove(thread) # remove dead thread if len(threads) < threadNum: for i in range(threadNum - len(threads)): # start threads within limit thread = Thread(target=test, args=(next(testIter),)) # create new thread thread.start() threads.append(thread) # record thread info time.sleep(0.1) except StopIteration: # traverse completed break for thread in threads: # wait until all threads exit thread.join() ss = Shadowsocks.load(isExtra = True) # ss = Shadowsocks.load(isExtra = False) ssr = ShadowsocksR.load() vmess = VMess.load() logging.critical('test start') # runTest(ss, 64) # runTest(ssr, 64) runTest(vmess, 64) logging.critical('test complete')