Browse Source

style: optimize test code

master
dnomd343 2 years ago
parent
commit
b69616bbb8
  1. 11
      Tester/Brook.py
  2. 10
      Tester/Plugin.py
  3. 6
      Tester/Settings.py
  4. 9
      Tester/Trojan.py
  5. 24
      Tester/V2ray.py
  6. 9
      Tester/VLESS.py
  7. 11
      Tester/Xray.py
  8. 6
      Tester/__init__.py
  9. 6
      test.py

11
Tester/Brook.py

@ -25,17 +25,20 @@ def originStream(isUot: bool) -> dict:
def loadWsCommand(proxyInfo: dict) -> list: # load start command for brook server def loadWsCommand(proxyInfo: dict) -> list: # load start command for brook server
return ([ brookCommand = [
'wsserver', '--listen', '%s:%i' % (hostFormat(proxyInfo['server'], v6Bracket = True), proxyInfo['port']) 'wsserver', '--listen', '%s:%i' % (hostFormat(proxyInfo['server'], v6Bracket = True), proxyInfo['port'])
] if proxyInfo['stream']['secure'] is None else [ ] if proxyInfo['stream']['secure'] is None else [
'wssserver', '--domainaddress', '%s:%i' % (proxyInfo['stream']['host'], proxyInfo['port']) 'wssserver', '--domainaddress', '%s:%i' % (proxyInfo['stream']['host'], proxyInfo['port'])
]) + [ ]
brookCommand += [
'--path', proxyInfo['stream']['path'], '--path', proxyInfo['stream']['path'],
'--password', proxyInfo['passwd'], '--password', proxyInfo['passwd'],
] + ([] if proxyInfo['stream']['secure'] is None else [ ]
brookCommand += ([] if proxyInfo['stream']['secure'] is None else [
'--cert', Settings['cert'], '--cert', Settings['cert'],
'--certkey', Settings['key'], '--certkey', Settings['key'],
]) + (['--withoutBrookProtocol'] if proxyInfo['stream']['raw'] else []) ])
return brookCommand + (['--withoutBrookProtocol'] if proxyInfo['stream']['raw'] else [])
def wsStream(isRaw: bool, isSecure: bool): def wsStream(isRaw: bool, isSecure: bool):

10
Tester/Plugin.py

@ -10,10 +10,7 @@ from Basis.Constant import Plugins
from Tester.Settings import Settings from Tester.Settings import Settings
from Basis.Functions import genFlag, hostFormat, getAvailablePort from Basis.Functions import genFlag, hostFormat, getAvailablePort
pluginParams = {}
pluginParams = {
'SITE': Settings['site']
}
pluginConfig = { pluginConfig = {
'simple-obfs': { 'simple-obfs': {
@ -316,15 +313,16 @@ def paramFill(param: str) -> str:
def load(proxyType: str): def load(proxyType: str):
if proxyType not in ['ss', 'trojan-go']: if proxyType not in ['ss', 'trojan-go']:
raise RuntimeError('Unknown proxy type for sip003 plugin') raise RuntimeError('Unknown proxy type for sip003 plugin')
cloakLoad() # init cloak config
kcptunLoad() # init kcptun config
pluginParams.update({ pluginParams.update({
'SITE': Settings['site'],
'HOST': Settings['host'], 'HOST': Settings['host'],
'CERT': Settings['cert'], 'CERT': Settings['cert'],
'KEY': Settings['key'], 'KEY': Settings['key'],
'PASSWD': genFlag(length = 8), # random password for test 'PASSWD': genFlag(length = 8), # random password for test
'PATH': '/' + genFlag(length = 6), # random uri path for test 'PATH': '/' + genFlag(length = 6), # random uri path for test
}) })
cloakLoad() # init cloak config
kcptunLoad() # init kcptun config
for pluginType in pluginConfig: for pluginType in pluginConfig:
for pluginTest, pluginTestInfo in pluginConfig[pluginType].items(): # traverse all plugin test item for pluginTest, pluginTestInfo in pluginConfig[pluginType].items(): # traverse all plugin test item
pluginParams['RANDOM'] = genFlag(length = 8) # refresh RANDOM field pluginParams['RANDOM'] = genFlag(length = 8) # refresh RANDOM field

6
Tester/Settings.py

@ -1,11 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from Basis.Constant import WorkDir
Settings = { Settings = {
'workDir': '/tmp/ProxyC', 'workDir': WorkDir,
'site': 'www.bing.com',
'serverBind': '', 'serverBind': '',
'clientBind': '', 'clientBind': '',
'site': 'www.bing.com',
'host': '', 'host': '',
'cert': '', 'cert': '',
'key': '', 'key': '',

9
Tester/Trojan.py

@ -18,11 +18,12 @@ def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict, xtlsFlow: s
'listen': proxyInfo['server'], 'listen': proxyInfo['server'],
'port': proxyInfo['port'], 'port': proxyInfo['port'],
'settings': { 'settings': {
'clients': [{**{ 'clients': [{
'password': proxyInfo['passwd'], 'password': proxyInfo['passwd'],
}, **({} if xtlsFlow is None else { **({} if xtlsFlow is None else {
'flow': xtlsFlow 'flow': xtlsFlow
})}], })
}],
}, },
'streamSettings': streamConfig 'streamSettings': streamConfig
}) })

24
Tester/V2ray.py

@ -108,15 +108,16 @@ def wsStream(isEd: bool) -> dict:
}, },
'server': { 'server': {
'network': 'ws', 'network': 'ws',
'wsSettings': {**{ 'wsSettings': {
'path': path, 'path': path,
'headers': { 'headers': {
'Host': Settings['host'] 'Host': Settings['host']
} },
}, **({} if not isEd else { **({} if not isEd else {
'maxEarlyData': 2048, 'maxEarlyData': 2048,
'earlyDataHeaderName': 'Sec-WebSocket-Protocol' 'earlyDataHeaderName': 'Sec-WebSocket-Protocol'
})} })
}
} }
} }
@ -177,11 +178,12 @@ def grpcStream(isMulti: bool) -> dict:
}, },
'server': { 'server': {
'network': 'grpc', 'network': 'grpc',
'grpcSettings': {**{ 'grpcSettings': {
'serviceName': service 'serviceName': service,
}, **({} if not isMulti else { **({} if not isMulti else {
'multiMode': True 'multiMode': True
})} })
}
} }
} }

9
Tester/VLESS.py

@ -18,11 +18,12 @@ def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict, xtlsFlow: s
'listen': proxyInfo['server'], 'listen': proxyInfo['server'],
'port': proxyInfo['port'], 'port': proxyInfo['port'],
'settings': { 'settings': {
'clients': [{**{ 'clients': [{
'id': proxyInfo['id'], 'id': proxyInfo['id'],
}, **({} if xtlsFlow is None else { **({} if xtlsFlow is None else {
'flow': xtlsFlow 'flow': xtlsFlow
})}], })
}],
'decryption': 'none' 'decryption': 'none'
}, },
'streamSettings': streamConfig 'streamSettings': streamConfig

11
Tester/Xray.py

@ -20,15 +20,16 @@ def addSecure(streamConfig: dict, xtlsFlow: str or None = None, isUdp443: bool =
streamConfig['caption'] += ' (with %s)' % ( streamConfig['caption'] += ' (with %s)' % (
'tls' if xtlsFlow is None else (xtlsFlow + ('-udp443' if isUdp443 else '')) 'tls' if xtlsFlow is None else (xtlsFlow + ('-udp443' if isUdp443 else ''))
) )
streamConfig['info']['secure'] = {**{ # secure options for client streamConfig['info']['secure'] = { # secure options for client
'type': 'tls' if xtlsFlow is None else 'xtls', 'type': 'tls' if xtlsFlow is None else 'xtls',
'sni': Settings['host'], 'sni': Settings['host'],
'alpn': None, 'alpn': None,
'verify': True, 'verify': True,
}, **({} if xtlsFlow is None else { **({} if xtlsFlow is None else {
'flow': xtlsFlow, 'flow': xtlsFlow,
'udp443': isUdp443, 'udp443': isUdp443,
})} })
}
streamConfig['server']['security'] = 'tls' if xtlsFlow is None else 'xtls' streamConfig['server']['security'] = 'tls' if xtlsFlow is None else 'xtls'
streamConfig['server']['%sSettings' % streamConfig['server']['security']] = { # cert and key for server streamConfig['server']['%sSettings' % streamConfig['server']['security']] = { # cert and key for server
'alpn': ['h2', 'http/1.1'], 'alpn': ['h2', 'http/1.1'],

6
Tester/__init__.py

@ -127,14 +127,14 @@ def loadCert(host: str = 'proxyc.net', remark: str = 'ProxyC') -> None:
logging.critical('Load self-signed certificate') logging.critical('Load self-signed certificate')
os.system('mkdir -p %s' % Settings['workDir']) # create work directory os.system('mkdir -p %s' % Settings['workDir']) # create work directory
logging.critical('Creating CA certificate and key...') logging.critical('Creating CA certificate and key...')
os.system(' '.join(['mad', 'ca'] + [ os.system(' '.join(['mad', 'ca'] + [ # generate CA certificate and privkey
'--ca', caCert, '--key', caKey, '--ca', caCert, '--key', caKey,
'--commonName', remark, '--commonName', remark,
'--organization', remark, '--organization', remark,
'--organizationUnit', remark, '--organizationUnit', remark,
])) ]))
logging.critical('Signing certificate...') logging.critical('Signing certificate...')
os.system(' '.join(['mad', 'cert'] + [ os.system(' '.join(['mad', 'cert'] + [ # generate certificate and privkey, then signed by CA
'--ca', caCert, '--ca_key', caKey, '--ca', caCert, '--ca_key', caKey,
'--cert', cert, '--key', key, '--cert', cert, '--key', key,
'--domain', host, '--domain', host,
@ -142,7 +142,7 @@ def loadCert(host: str = 'proxyc.net', remark: str = 'ProxyC') -> None:
'--organizationUnit', remark, '--organizationUnit', remark,
])) ]))
logging.critical('Installing CA certificate...') logging.critical('Installing CA certificate...')
os.system('cat %s >> /etc/ssl/certs/ca-certificates.crt' % caCert) os.system('cat %s >> /etc/ssl/certs/ca-certificates.crt' % caCert) # add into system's trust list
Settings['host'] = host Settings['host'] = host
Settings['cert'] = cert Settings['cert'] = cert
Settings['key'] = key Settings['key'] = key

6
test.py

@ -1,7 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os
import sys import sys
import Tester import Tester
from Basis.Logger import logging from Basis.Logger import logging
@ -46,12 +45,13 @@ if getArg('--filter') is not None:
testFilter = set(getArg('--filter').split(',')) testFilter = set(getArg('--filter').split(','))
isV6 = '--ipv6' in sys.argv isV6 = '--ipv6' in sys.argv
Tester.loadBind(serverV6 = isV6, clientV6 = isV6) Tester.loadBind(serverV6 = isV6, clientV6 = isV6) # ipv4 / ipv6 (127.0.0.1 / ::1)
Tester.loadCert('proxyc.net', 'ProxyC') Tester.loadCert('proxyc.net', 'ProxyC') # default cert config
logging.critical('TEST ITEM: ' + ('all' if testItem is None else testItem)) logging.critical('TEST ITEM: ' + ('all' if testItem is None else testItem))
logging.critical('FILTER: %s' % testFilter) logging.critical('FILTER: %s' % testFilter)
logging.critical('URL: ' + testUrl) logging.critical('URL: ' + testUrl)
logging.critical('THREAD NUMBER: %i' % threadNum) logging.critical('THREAD NUMBER: %i' % threadNum)
logging.critical('-------------------------------- TEST START --------------------------------') logging.critical('-------------------------------- TEST START --------------------------------')
if testItem is not None: if testItem is not None:
if testItem == 'ss' and '--all' in sys.argv: if testItem == 'ss' and '--all' in sys.argv:

Loading…
Cancel
Save