Browse Source

perf: Tester module

master^2
dnomd343 2 years ago
parent
commit
62f38df8fb
  1. 9
      Basis/Constant.py
  2. 5
      Basis/Logger.py
  3. 159
      Basis/Test.py
  4. 10
      Builder/Shadowsocks.py
  5. 5
      Tester/Brook.py
  6. 5
      Tester/Hysteria.py
  7. 10
      Tester/Plugin.py
  8. 14
      Tester/Settings.py
  9. 38
      Tester/Shadowsocks.py
  10. 5
      Tester/ShadowsocksR.py
  11. 5
      Tester/Trojan.py
  12. 14
      Tester/TrojanGo.py
  13. 2
      Tester/V2ray.py
  14. 5
      Tester/VLESS.py
  15. 5
      Tester/VMess.py
  16. 2
      Tester/Xray.py
  17. 128
      Tester/__init__.py
  18. 13
      test.py

9
Basis/Constant.py

@ -6,6 +6,15 @@ WorkDir = '/tmp/ProxyC'
PathEnv = '/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin'
# Shadowsocks Info
mbedtlsMethods = [
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
]
ssMethods = { # methods support of different Shadowsocks project
'ss-rust': [ # table method removed refer to https://github.com/shadowsocks/shadowsocks-rust/issues/887
'none', 'plain', 'rc4', 'rc4-md5',

5
Basis/Logger.py

@ -6,8 +6,9 @@ import logging
from colorlog import ColoredFormatter
logFile = 'runtime.log'
logLevel = logging.DEBUG
# logLevel = logging.WARNING
# logLevel = logging.INFO
# logLevel = logging.DEBUG
logLevel = logging.WARNING
dateFormat = '%Y-%m-%d %H:%M:%S'
logFormat = '[%(asctime)s] [%(levelname)s] %(message)s (%(module)s.%(funcName)s)'
logging.basicConfig(

159
Basis/Test.py

@ -0,0 +1,159 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import requests
from threading import Thread
from Basis.Logger import logging
from Basis.Constant import WorkDir
from Basis.Functions import md5Sum, genFlag, hostFormat, checkPortStatus
Settings = {
'workDir': WorkDir,
'site': 'www.bing.com',
'serverBind': '',
'clientBind': '',
'host': '',
'cert': '',
'key': '',
}
def loadBind(serverV6: bool = False, clientV6: bool = False) -> None:
Settings['serverBind'] = '::1' if serverV6 else '127.0.0.1'
Settings['clientBind'] = '::1' if clientV6 else '127.0.0.1'
def waitPort(port: int, times: int = 50, delay: int = 100) -> bool: # wait until port occupied
for i in range(times):
if not checkPortStatus(port): # port occupied
return True
time.sleep(delay / 1000) # default wait 100ms * 50 => 5s
return False # timeout
def genCert(host: str, certInfo: dict, remark: str = 'ProxyC') -> None: # generate self-signed certificate
certOrgInfo = ['--organization', remark, '--organizationUnit', remark] # organization info
logging.critical('Load self-signed certificate')
os.system('mkdir -p %s' % Settings['workDir']) # make sure that work directory exist
# create CA data at first (by mad)
logging.critical('Creating CA certificate and key...')
os.system(' '.join( # generate CA certificate and privkey
['mad', 'ca', '--ca', certInfo['caCert'], '--key', certInfo['caKey'], '--commonName', remark] + certOrgInfo
))
# generate private key and sign certificate
logging.critical('Signing certificate...')
os.system(' '.join(['mad', 'cert', '--domain', host] + [ # generate certificate and privkey, then signed by CA
'--ca', certInfo['caCert'], '--ca_key', certInfo['caKey'],
'--cert', certInfo['cert'], '--key', certInfo['key'],
] + certOrgInfo))
# install CA certificate and record self-signed cert info
logging.critical('Installing CA certificate...')
os.system('cat %s >> /etc/ssl/certs/ca-certificates.crt' % certInfo['caCert']) # add into system's trust list
def loadCert(host: str = 'proxyc.net', certId: str = '') -> None: # load certificate
newCert = (certId == '')
certId = genFlag(length = 8) if certId == '' else certId
certInfo = {
'caCert': os.path.join(Settings['workDir'], 'proxyc_%s_ca.pem' % certId),
'caKey': os.path.join(Settings['workDir'], 'proxyc_%s_ca_key.pem' % certId),
'cert': os.path.join(Settings['workDir'], 'proxyc_%s_cert.pem' % certId),
'key': os.path.join(Settings['workDir'], 'proxyc_%s_cert_key.pem' % certId),
}
if newCert:
genCert(host, certInfo) # generate new certificate
Settings['host'] = host
Settings['cert'] = certInfo['cert']
Settings['key'] = certInfo['key']
logging.warning('Certificate load complete -> ID = %s' % certId)
def httpCheck(socksInfo: dict, url: str, testId: str, timeout: int = 10) -> None:
socksProxy = 'socks5://%s:%i' % (hostFormat(socksInfo['addr'], v6Bracket = True), socksInfo['port'])
try:
logging.debug('[%s] Http request via %s' % (testId, socksProxy))
request = requests.get(url, timeout = timeout, proxies = { # http request via socks5
'http': socksProxy,
'https': socksProxy,
})
request.raise_for_status() # throw error when server return 4xx or 5xx (don't actually need)
logging.info('[%s] %s -> ok' % (testId, socksProxy))
except Exception as exp:
logging.error('[%s] %s -> error\n%s' % (testId, socksProxy, exp)) # show detail of error reason
raise RuntimeError('Http request via socks5 failed')
def runTest(testInfo: dict, testUrl: str, testSelect: set or None, delay: int = 1) -> None:
testId = md5Sum(testInfo['caption'])[:12] # generate test ID
if testSelect is not None: # testSelect is None -> run all test
if testId not in testSelect: # skip unselected task
return
logging.warning('[%s] %s' % (testId, testInfo['caption'])) # show caption
logging.debug('[%s] Server ID -> %s | Client ID -> %s' % (
testId, testInfo['server'].id, testInfo['client'].id
))
testInfo['server'].id = testId + '-server'
testInfo['client'].id = testId + '-client'
# build server and client and wait them start
testInfo['server'].start() # start test server
if waitPort(testInfo['interface']['port']): # wait for server
logging.debug('[%s] Test server start complete' % testId)
testInfo['client'].start() # start test client
if waitPort(testInfo['socks']['port']): # wait for client
logging.debug('[%s] Test client start complete' % testId)
# start test process
try:
logging.debug('[%s] Test process start' % testId)
time.sleep(delay) # delay a short time before check
httpCheck(testInfo['socks'], testUrl, testId) # run http request test
testInfo['client'].quit() # clean up client
testInfo['server'].quit() # clean up server
except:
testInfo['client'].quit()
testInfo['server'].quit()
logging.warning('[%s] Client info' % testId)
logging.error('[%(id)s-server]\n▲ CMD => %(cmd)s\n▲ ENV => %(env)s\n▲ FILE => %(file)s\n%(output)s' % {
'id': testId,
'cmd': testInfo['client'].cmd,
'env': testInfo['client'].env,
'file': testInfo['client'].file,
'output': '-' * 96 + '\n' + testInfo['client'].output + '-' * 96,
})
logging.warning('[%s] Server info' % testId)
logging.error('[%(id)s-client]\n▲ CMD => %(cmd)s\n▲ ENV => %(env)s\n▲ FILE => %(file)s\n%(output)s' % {
'id': testId,
'cmd': testInfo['server'].cmd,
'env': testInfo['server'].env,
'file': testInfo['server'].file,
'output': '-' * 96 + '\n' + testInfo['server'].output + '-' * 96,
})
def Test(testIter: iter, threadNum: int, testUrl: str, testFilter: set or None = None) -> None:
threads = []
while True: # infinite loop
try:
for thread in threads:
if thread.is_alive(): # skip running thread
continue
threads.remove(thread) # remove dead thread
if len(threads) < threadNum:
for i in range(threadNum - len(threads)): # start threads within limit
thread = Thread( # create new thread
target = runTest,
args = (next(testIter), testUrl, testFilter)
)
thread.start()
threads.append(thread) # record thread info
time.sleep(0.1)
except StopIteration: # traverse completed
break
for thread in threads: # wait until all threads exit
thread.join()

10
Builder/Shadowsocks.py

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
import json
from Basis.Constant import ssMethods, ssAllMethods
from Basis.Constant import ssMethods, ssAllMethods, mbedtlsMethods
def loadConfig(proxyInfo: dict, socksInfo: dict) -> dict: # load basic config option
@ -45,14 +45,6 @@ def ssLibev(proxyInfo: dict, socksInfo: dict, isUdp: bool) -> tuple[dict, list]:
def ssPython(proxyInfo: dict, socksInfo: dict, isUdp: bool) -> tuple[dict, list]:
config = loadConfig(proxyInfo, socksInfo)
mbedtlsMethods = [
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
]
if config['method'] in mbedtlsMethods: # mbedtls methods should use prefix `mbedtls:`
config['method'] = 'mbedtls:' + config['method']
if config['method'] in ['idea-cfb', 'seed-cfb']: # only older versions of openssl are supported

5
Tester/Brook.py

@ -4,9 +4,9 @@
import copy
import itertools
from Builder import Brook
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Tester.Settings import Settings
from Basis.Functions import hostFormat, genFlag, getAvailablePort
@ -78,7 +78,7 @@ def loadTest(stream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New brook test -> %s' % testInfo)
logging.debug('New Brook test -> %s' % testInfo)
return testInfo
@ -91,3 +91,4 @@ def load():
addStream(wsStream(isRaw, isSecure)) # websocket stream test
for stream in streams:
yield loadTest(stream)
logging.info('Brook test yield complete')

5
Tester/Hysteria.py

@ -5,9 +5,9 @@ import os
import json
import itertools
from Builder import Hysteria
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Tester.Settings import Settings
from Basis.Constant import hysteriaProtocols
from Basis.Functions import hostFormat, genFlag, getAvailablePort
@ -78,10 +78,11 @@ def loadTest(protocol: str, isObfs: bool, isAuth: bool) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New hysteria test -> %s' % testInfo)
logging.debug('New Hysteria test -> %s' % testInfo)
return testInfo
def load():
for protocol, isObfs, isAuth in itertools.product(hysteriaProtocols, [False, True], [False, True]):
yield loadTest(protocol, isObfs, isAuth)
logging.info('Hysteria test yield complete')

10
Tester/Plugin.py

@ -4,10 +4,10 @@
import os
import re
import json
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import Plugins
from Tester.Settings import Settings
from Basis.Functions import genFlag, hostFormat, getAvailablePort
pluginParams = {}
@ -310,7 +310,7 @@ def paramFill(param: str) -> str:
return param
def load(proxyType: str):
def load(proxyType: str) -> list:
if proxyType not in ['ss', 'trojan-go']:
raise RuntimeError('Unknown proxy type for sip003 plugin')
pluginParams.update({
@ -321,13 +321,14 @@ def load(proxyType: str):
'PASSWD': genFlag(length = 8), # random password for test
'PATH': '/' + genFlag(length = 6), # random uri path for test
})
result = []
cloakLoad() # init cloak config
kcptunLoad() # init kcptun config
for pluginType in pluginConfig:
for pluginTest, pluginTestInfo in pluginConfig[pluginType].items(): # traverse all plugin test item
pluginParams['RANDOM'] = genFlag(length = 8) # refresh RANDOM field
pluginParams['RABBIT_PORT'] = str(getAvailablePort()) # allocate port before rabbit plugin start
yield {
result.append({
'type': pluginType,
'caption': pluginTest,
'server': { # plugin info for server
@ -339,4 +340,5 @@ def load(proxyType: str):
'param': paramFill(pluginTestInfo[1]),
},
'inject': ssInject if proxyType == 'ss' else trojanInject # for some special plugins
}
})
return result

14
Tester/Settings.py

@ -1,14 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from Basis.Constant import WorkDir
Settings = {
'workDir': WorkDir,
'site': 'www.bing.com',
'serverBind': '',
'clientBind': '',
'host': '',
'cert': '',
'key': '',
}

38
Tester/Shadowsocks.py

@ -7,11 +7,11 @@ import base64
import itertools
from Tester import Plugin
from Builder import Shadowsocks
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Tester.Settings import Settings
from Basis.Constant import ssMethods, ssAllMethods
from Basis.Functions import md5Sum, genFlag, getAvailablePort
from Basis.Constant import ssMethods, ssAllMethods, mbedtlsMethods
def loadConfig(proxyInfo: dict) -> dict: # load basic config option
@ -43,14 +43,6 @@ def ssLibev(proxyInfo: dict, isUdp: bool) -> tuple[dict, list]:
def ssPython(proxyInfo: dict, isUdp: bool) -> tuple[dict, list]:
config = loadConfig(proxyInfo)
mbedtlsMethods = [
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
]
if config['method'] in mbedtlsMethods: # mbedtls methods should use prefix `mbedtls:`
config['method'] = 'mbedtls:' + config['method']
if config['method'] in ['idea-cfb', 'seed-cfb']: # only older versions of openssl are supported
@ -135,19 +127,11 @@ def loadTest(serverType: str, clientType: str, method: str, plugin: dict or None
}
if plugin is not None:
testInfo['server'] = plugin['inject'](testInfo['server'], plugin)
logging.debug('New shadowsocks test -> %s' % testInfo)
logging.debug('New Shadowsocks test -> %s' % testInfo)
return testInfo
def load(isExtra: bool = False):
pluginTest = []
pluginIter = Plugin.load('ss')
while True:
try:
pluginTest.append(next(pluginIter)) # export data of plugin generator
except StopIteration:
break
if not isExtra: # just test basic connection
def loadCommon(pluginTest: list): # shadowsocks basic test
for method in ssAllMethods: # test every method for once
for ssType in ssMethods: # found the client which support this method
if method not in ssMethods[ssType]: continue
@ -158,10 +142,22 @@ def load(isExtra: bool = False):
ssType = list(ssMethods.keys())[0] # choose the first one
for plugin in pluginTest[1:]: # test every plugin (except the first one that has been checked)
yield loadTest(ssType, ssType, ssMethods[ssType][0], plugin)
return
def loadExtra(pluginTest: list):
for ssServer in ssMethods: # traverse all shadowsocks type as server
for method, ssClient in itertools.product(ssMethods[ssServer], ssMethods): # supported methods and clients
if method not in ssMethods[ssClient]: continue
yield loadTest(ssServer, ssClient, method) # ssServer <-- method --> ssClient
for ssType, plugin in itertools.product(ssMethods, pluginTest): # test every plugin with different ss project
yield loadTest(ssType, ssType, ssMethods[ssType][0], plugin)
def load(isExtra: bool = False):
ssIter = (loadExtra if isExtra else loadCommon)(Plugin.load('ss'))
while True:
try:
yield next(ssIter)
except StopIteration:
break
logging.info('Shadowsocks test yield complete')

5
Tester/ShadowsocksR.py

@ -3,10 +3,10 @@
import os
import json
from Basis.Test import Settings
from Builder import ShadowsocksR
from Basis.Logger import logging
from Basis.Process import Process
from Tester.Settings import Settings
from Basis.Functions import genFlag, getAvailablePort
from Basis.Constant import ssrMethods, ssrProtocols, ssrObfuscations
@ -64,7 +64,7 @@ def loadTest(method: str, protocol: str, obfs: str) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New shadowsocksr test -> %s' % testInfo)
logging.debug('New ShadowsocksR test -> %s' % testInfo)
return testInfo
@ -75,3 +75,4 @@ def load():
yield loadTest('aes-128-ctr', protocol, 'plain')
for obfs in ssrObfuscations:
yield loadTest('aes-128-ctr', 'origin', obfs)
logging.info('ShadowsocksR test yield complete')

5
Tester/Trojan.py

@ -5,10 +5,10 @@ import os
import json
from Tester import Xray
from Builder import Trojan
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import xtlsFlows
from Tester.Settings import Settings
from Basis.Functions import md5Sum, genFlag, getAvailablePort
@ -110,7 +110,7 @@ def loadTest(stream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New trojan test -> %s' % testInfo)
logging.debug('New Trojan test -> %s' % testInfo)
return testInfo
@ -119,3 +119,4 @@ def load():
yield loadBasicTest(streams[1]) # Trojan basic test -> TCP stream with TLS
for stream in streams: # test all stream cases
yield loadTest(stream)
logging.info('Trojan test yield complete')

14
Tester/TrojanGo.py

@ -5,9 +5,9 @@ import os
import json
from Tester import Plugin
from Builder import TrojanGo
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Tester.Settings import Settings
from Basis.Constant import trojanGoMethods
from Basis.Functions import md5Sum, genFlag, getAvailablePort
@ -85,18 +85,11 @@ def loadTest(wsObject: dict or None, ssObject: dict or None, plugin: dict or Non
}
if plugin is not None:
testInfo['server'] = plugin['inject'](testInfo['server'], plugin)
logging.debug('New trojan-go test -> %s' % testInfo)
logging.debug('New Trojan-Go test -> %s' % testInfo)
return testInfo
def load():
pluginTest = []
pluginIter = Plugin.load('trojan-go')
while True:
try:
pluginTest.append(next(pluginIter)) # export data of plugin generator
except StopIteration:
break
wsObject = {
'host': Settings['host'],
'path': '/' + genFlag(length = 6),
@ -108,5 +101,6 @@ def load():
'passwd': genFlag(length = 8)
}
yield loadTest(wsObject, None if ssObject['method'] == '' else ssObject, None)
for plugin in pluginTest: # different plugin for trojan-go
for plugin in Plugin.load('trojan-go'): # different plugin for trojan-go
yield loadTest(None, None, plugin)
logging.info('Trojan-Go test yield complete')

2
Tester/V2ray.py

@ -3,8 +3,8 @@
import copy
import itertools
from Basis.Test import Settings
from Basis.Functions import genFlag
from Tester.Settings import Settings
from Basis.Constant import quicMethods, udpObfuscations
httpConfig = {

5
Tester/VLESS.py

@ -5,10 +5,10 @@ import os
import json
from Tester import Xray
from Builder import VLESS
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import xtlsFlows
from Tester.Settings import Settings
from Basis.Functions import md5Sum, genUUID, getAvailablePort
@ -71,7 +71,7 @@ def loadTest(stream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New vless test -> %s' % testInfo)
logging.debug('New VLESS test -> %s' % testInfo)
return testInfo
@ -79,3 +79,4 @@ def load():
streams = Xray.loadStream() # load xray-core stream list
for stream in streams: # test all stream cases
yield loadTest(stream)
logging.info('VLESS test yield complete')

5
Tester/VMess.py

@ -6,9 +6,9 @@ import json
import itertools
from Tester import V2ray
from Builder import VMess
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Tester.Settings import Settings
from Basis.Constant import PathEnv, vmessMethods
from Basis.Functions import md5Sum, genUUID, getAvailablePort
@ -69,7 +69,7 @@ def loadTest(method: str, aid: int, stream: dict) -> dict:
'port': proxyInfo['port'],
}
}
logging.debug('New vmess test -> %s' % testInfo)
logging.debug('New VMess test -> %s' % testInfo)
return testInfo
@ -79,3 +79,4 @@ def load():
yield loadTest(method, aid, streams[0])
for stream in streams[1:]: # skip first stream that has benn checked
yield loadTest('auto', 0, stream) # aead with auto security
logging.info('VMess test yield complete')

2
Tester/Xray.py

@ -4,8 +4,8 @@
import copy
import itertools
from Tester import V2ray
from Basis.Test import Settings
from Basis.Functions import genFlag
from Tester.Settings import Settings
from Basis.Constant import xtlsFlows, quicMethods, udpObfuscations
loadConfig = V2ray.loadConfig

128
Tester/__init__.py

@ -1,14 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import requests
from threading import Thread
from Basis.Logger import logging
from Tester.Settings import Settings
from Basis.Functions import md5Sum, genFlag, hostFormat, checkPortStatus
from Tester import Brook
from Tester import VMess
from Tester import VLESS
@ -18,7 +10,7 @@ from Tester import Hysteria
from Tester import Shadowsocks
from Tester import ShadowsocksR
entry = {
testEntry = {
'ss': Shadowsocks.load(),
'ss-all': Shadowsocks.load(isExtra = True),
'ssr': ShadowsocksR.load(),
@ -29,121 +21,3 @@ entry = {
'brook': Brook.load(),
'hysteria': Hysteria.load(),
}
def waitPort(port: int, times: int = 100, delay: int = 100) -> bool: # wait until port occupied
for i in range(times):
if not checkPortStatus(port): # port occupied
return True
time.sleep(delay / 1000) # default wait 100ms
return False # timeout
def httpCheck(socksInfo: dict, url: str, testId: str, timeout: int = 10) -> None:
socksProxy = 'socks5://%s:%i' % (hostFormat(socksInfo['addr'], v6Bracket = True), socksInfo['port'])
try:
proxy = {
'http': socksProxy,
'https': socksProxy,
}
request = requests.get(url, timeout = timeout, proxies = proxy)
request.raise_for_status()
logging.info('[%s] %s -> ok' % (testId, socksProxy))
except Exception as exp:
logging.error('[%s] %s -> error' % (testId, socksProxy))
logging.error('requests exception\n' + str(exp))
raise RuntimeError('socks5 test failed')
def runTest(testInfo: dict, testUrl: str, testFilter: set or None, delay: int = 1) -> None:
testInfo['hash'] = md5Sum(testInfo['caption'])[:12]
if testFilter is not None and testInfo['hash'] not in testFilter: return
logging.warning('[%s] %s' % (testInfo['hash'], testInfo['caption']))
testInfo['server'].start() # start test server
if waitPort(testInfo['interface']['port']): # wait for server
logging.debug('server start complete')
testInfo['client'].start() # start test client
if waitPort(testInfo['socks']['port']): # wait for client
logging.debug('client start complete')
try:
logging.debug('start test process')
time.sleep(delay)
httpCheck(testInfo['socks'], testUrl, testInfo['hash'])
testInfo['client'].quit()
testInfo['server'].quit()
except:
# client debug info
testInfo['client'].quit()
logging.warning('[%s] client info' % testInfo['hash'])
logging.error('command -> %s' % testInfo['client'].cmd)
logging.error('envVar -> %s' % testInfo['client'].env)
logging.error('file -> %s' % testInfo['client'].file)
logging.warning('[%s] client capture output' % testInfo['hash'])
logging.error('\n%s' % testInfo['client'].output)
# server debug info
testInfo['server'].quit()
logging.warning('[%s] server info' % testInfo['hash'])
logging.error('command -> %s' % testInfo['server'].cmd)
logging.error('envVar -> %s' % testInfo['server'].env)
logging.error('file -> %s' % testInfo['server'].file)
logging.warning('[%s] server capture output' % testInfo['hash'])
logging.error('\n%s' % testInfo['server'].output)
def test(testIter: iter, threadNum: int, testUrl: str, testFilter: set or None = None) -> None:
threads = []
while True: # infinite loop
try:
for thread in threads:
if thread.is_alive(): continue
threads.remove(thread) # remove dead thread
if len(threads) < threadNum:
for i in range(threadNum - len(threads)): # start threads within limit
thread = Thread( # create new thread
target = runTest,
args = (next(testIter), testUrl, testFilter)
)
thread.start()
threads.append(thread) # record thread info
time.sleep(0.1)
except StopIteration: # traverse completed
break
for thread in threads: # wait until all threads exit
thread.join()
def loadBind(serverV6: bool = False, clientV6: bool = False) -> None:
Settings['serverBind'] = '::1' if serverV6 else '127.0.0.1'
Settings['clientBind'] = '::1' if clientV6 else '127.0.0.1'
def loadCert(host: str = 'proxyc.net', remark: str = 'ProxyC') -> None:
loadPath = lambda x: os.path.join(Settings['workDir'], x)
certFlag = genFlag(length = 8)
caCert = loadPath('proxyc_%s_ca.pem' % certFlag)
caKey = loadPath('proxyc_%s_ca_key.pem' % certFlag)
cert = loadPath('proxyc_%s_cert.pem' % certFlag)
key = loadPath('proxyc_%s_cert_key.pem' % certFlag)
logging.critical('Load self-signed certificate')
os.system('mkdir -p %s' % Settings['workDir']) # create work directory
logging.critical('Creating CA certificate and key...')
os.system(' '.join(['mad', 'ca'] + [ # generate CA certificate and privkey
'--ca', caCert, '--key', caKey,
'--commonName', remark,
'--organization', remark,
'--organizationUnit', remark,
]))
logging.critical('Signing certificate...')
os.system(' '.join(['mad', 'cert'] + [ # generate certificate and privkey, then signed by CA
'--ca', caCert, '--ca_key', caKey,
'--cert', cert, '--key', key,
'--domain', host,
'--organization', remark,
'--organizationUnit', remark,
]))
logging.critical('Installing CA certificate...')
os.system('cat %s >> /etc/ssl/certs/ca-certificates.crt' % caCert) # add into system's trust list
Settings['host'] = host
Settings['cert'] = cert
Settings['key'] = key
logging.warning('Certificate loading complete')

13
test.py

@ -2,8 +2,9 @@
# -*- coding: utf-8 -*-
import sys
import Tester
from Tester import testEntry
from Basis.Logger import logging
from Basis.Test import Test, loadBind, loadCert
threadNum = 16
testItem = None
@ -45,8 +46,8 @@ if getArg('--filter') is not None:
testFilter = set(getArg('--filter').split(','))
isV6 = '--ipv6' in sys.argv
Tester.loadBind(serverV6 = isV6, clientV6 = isV6) # ipv4 / ipv6 (127.0.0.1 / ::1)
Tester.loadCert('proxyc.net', 'ProxyC') # default cert config
loadBind(serverV6 = isV6, clientV6 = isV6) # ipv4 / ipv6 (127.0.0.1 / ::1)
loadCert('proxyc.net') # default cert config
logging.critical('TEST ITEM: ' + ('all' if testItem is None else testItem))
logging.critical('FILTER: %s' % testFilter)
logging.critical('URL: ' + testUrl)
@ -56,11 +57,11 @@ logging.critical('-------------------------------- TEST START ------------------
if testItem is not None:
if testItem == 'ss' and '--all' in sys.argv:
testItem = 'ss-all'
Tester.test(Tester.entry[testItem], threadNum, testUrl, testFilter)
Test(testEntry[testItem], threadNum, testUrl, testFilter)
else:
for item in Tester.entry:
for item in testEntry:
if item == ('ss' if '--all' in sys.argv else 'ss-all'): # skip ss / ss-all
continue
logging.critical('TEST ITEM -> ' + item)
Tester.test(Tester.entry[item], threadNum, testUrl, testFilter)
Test(testEntry[item], threadNum, testUrl, testFilter)
logging.critical('-------------------------------- TEST COMPLETE --------------------------------')

Loading…
Cancel
Save