Browse Source

update: optimize the tester

master
Dnomd343 2 years ago
parent
commit
38fe04816c
  1. 7
      ProxyTester/Brook.py
  2. 1
      ProxyTester/Plugin.py
  3. 47
      ProxyTester/Shadowsocks.py
  4. 30
      ProxyTester/ShadowsocksR.py
  5. 91
      ProxyTester/Trojan.py
  6. 67
      ProxyTester/TrojanGo.py
  7. 8
      ProxyTester/V2ray.py
  8. 66
      ProxyTester/VLESS.py
  9. 76
      ProxyTester/VMess.py
  10. 3
      ProxyTester/Xray.py
  11. 15
      ProxyTester/tester.py
  12. 6
      Test.py

7
ProxyTester/Brook.py

@ -5,12 +5,13 @@ import copy
testConfig = {} testConfig = {}
def __originConfig() -> dict: def __originConfig() -> dict:
return { return {
'caption': 'Brook original', 'caption': 'Brook original',
'client': { 'client': {
'type': 'brook', 'type': 'brook',
'server': testConfig['bind'], 'server': testConfig['addr'],
'port': testConfig['port'], 'port': testConfig['port'],
'passwd': testConfig['passwd'] 'passwd': testConfig['passwd']
}, },
@ -27,7 +28,7 @@ def __wsConfig() -> dict:
'caption': 'Brook websocket', 'caption': 'Brook websocket',
'client': { 'client': {
'type': 'brook', 'type': 'brook',
'server': testConfig['bind'], 'server': testConfig['addr'],
'port': testConfig['port'], 'port': testConfig['port'],
'passwd': testConfig['passwd'], 'passwd': testConfig['passwd'],
'ws': { 'ws': {
@ -49,7 +50,7 @@ def __wssConfig() -> dict:
'caption': 'Brook websocket with TLS', 'caption': 'Brook websocket with TLS',
'client': { 'client': {
'type': 'brook', 'type': 'brook',
'server': testConfig['bind'], 'server': testConfig['addr'],
'port': testConfig['port'], 'port': testConfig['port'],
'passwd': testConfig['passwd'], 'passwd': testConfig['passwd'],
'ws': { 'ws': {

1
ProxyTester/Plugin.py

@ -235,6 +235,7 @@ pluginConfig = {
] ]
} }
def loadPluginConfig(plugin: str, host: str, cert: str, key: str) -> list: # without rabbit-tcp def loadPluginConfig(plugin: str, host: str, cert: str, key: str) -> list: # without rabbit-tcp
result = [] result = []
filePath = None filePath = None

47
ProxyTester/Shadowsocks.py

@ -4,7 +4,7 @@
import copy import copy
import ProxyTester.Plugin as sip003 import ProxyTester.Plugin as sip003
config = {} testConfig = {}
ssMethodList = [ ssMethodList = [
'aes-128-gcm', 'aes-128-gcm',
@ -73,29 +73,31 @@ sip003PluginList = [ # SIP003插件列表
'gun-plugin' 'gun-plugin'
] ]
def __ssServerConfig(method: str, plugin: str or None) -> list: def __ssServerConfig(method: str, plugin: str or None) -> list:
rabbitPort = 20191 rabbitPort = 20191
proxyInfo = { proxyInfo = {
'type': 'ss', 'type': 'ss',
'server': '127.0.0.1', 'server': testConfig['addr'],
'port': config['port'], 'port': testConfig['port'],
'passwd': config['passwd'], 'passwd': testConfig['passwd'],
'method': method 'method': method
} }
caption = 'Shadowsocks method ' + method caption = 'Shadowsocks method ' + method
if method in ['plain', 'none']: # plain / none -> ss-rust if method in ['plain', 'none']: # plain / none -> ss-rust
serverCommand = [ serverCommand = [
'ss-rust-server', 'ss-rust-server',
'-s', '0.0.0.0:' + str(config['port']), '-s', testConfig['bind'] + ':' + str(testConfig['port']),
'-k', config['passwd'], '-k', testConfig['passwd'],
'-m', method '-m', method
] ]
elif method == 'salsa20-ctr': # salsa20-ctr -> ss-python-legacy elif method == 'salsa20-ctr': # salsa20-ctr -> ss-python-legacy
serverCommand = [ serverCommand = [
'ss-bootstrap-server', '--no-udp', 'ss-bootstrap-server', '--no-udp',
'--shadowsocks', 'ss-python-legacy-server', '--shadowsocks', 'ss-python-legacy-server',
'-p', str(config['port']), '-s', testConfig['bind'],
'-k', config['passwd'], '-p', str(testConfig['port']),
'-k', testConfig['passwd'],
'-m', method '-m', method
] ]
else: # others -> ss-python else: # others -> ss-python
@ -112,8 +114,9 @@ def __ssServerConfig(method: str, plugin: str or None) -> list:
serverCommand = [ serverCommand = [
'ss-bootstrap-server', '--no-udp', 'ss-bootstrap-server', '--no-udp',
'--shadowsocks', 'ss-python-server', '--shadowsocks', 'ss-python-server',
'-p', str(config['port']), '-s', testConfig['bind'],
'-k', config['passwd'], '-p', str(testConfig['port']),
'-k', testConfig['passwd'],
'-m', method '-m', method
] ]
if method == 'idea-cfb' or method == 'seed-cfb': if method == 'idea-cfb' or method == 'seed-cfb':
@ -136,7 +139,7 @@ def __ssServerConfig(method: str, plugin: str or None) -> list:
proxyInfo['port'] = rabbitPort proxyInfo['port'] = rabbitPort
proxyInfo['plugin'] = { proxyInfo['plugin'] = {
'type': 'rabbit-plugin', 'type': 'rabbit-plugin',
'param': 'serviceAddr=127.0.0.1:' + str(config['port']) + ';password=' + config['passwd'] 'param': 'serviceAddr=127.0.0.1:' + str(testConfig['port']) + ';password=' + testConfig['passwd']
} }
return [{ return [{
'proxy': proxyInfo, 'proxy': proxyInfo,
@ -151,7 +154,7 @@ def __ssServerConfig(method: str, plugin: str or None) -> list:
'startCommand': [ 'startCommand': [
'rabbit', 'rabbit',
'-mode', 's', '-mode', 's',
'-password', config['passwd'], '-password', testConfig['passwd'],
'-rabbit-addr', ':' + str(rabbitPort) '-rabbit-addr', ':' + str(rabbitPort)
], ],
'fileContent': None, 'fileContent': None,
@ -162,7 +165,9 @@ def __ssServerConfig(method: str, plugin: str or None) -> list:
# others plugin # others plugin
result = [] result = []
pluginConfig = sip003.loadPluginConfig(plugin, config['host'], config['cert'], config['key']) # 载入插件配置 pluginConfig = sip003.loadPluginConfig( # 载入插件配置
plugin, testConfig['host'], testConfig['cert'], testConfig['key']
)
serverBaseCommand = copy.deepcopy(serverCommand) serverBaseCommand = copy.deepcopy(serverCommand)
for pluginOption in pluginConfig: for pluginOption in pluginConfig:
serverCommand = copy.deepcopy(serverBaseCommand) serverCommand = copy.deepcopy(serverBaseCommand)
@ -184,12 +189,14 @@ def __ssServerConfig(method: str, plugin: str or None) -> list:
})) }))
return result return result
def ssTest(ssConfig: dict) -> list:
result = [] def test(config: dict) -> list:
for key, value in ssConfig.items(): # ssConfig -> config global testConfig
config[key] = value testConfig = config
testList = []
for method in ssMethodList: # all Shadowsocks methods for method in ssMethodList: # all Shadowsocks methods
result += __ssServerConfig(method, None) testList += __ssServerConfig(method, None)
for plugin in sip003PluginList: # all SIP003 plugin for plugin in sip003PluginList: # all SIP003 plugin
result += __ssServerConfig('aes-256-ctr', plugin) testList += __ssServerConfig('aes-256-ctr', plugin)
return result return testList

30
ProxyTester/ShadowsocksR.py

@ -1,7 +1,7 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding:utf-8 -*- # -*- coding:utf-8 -*-
config = {} testConfig = {}
ssrMethodList = [ ssrMethodList = [
"aes-128-cfb", "aes-128-cfb",
@ -76,17 +76,18 @@ ssrObfsList = [
def __ssrServerConfig(method: str, protocol: str, obfs: str, caption: str) -> list: def __ssrServerConfig(method: str, protocol: str, obfs: str, caption: str) -> list:
proxyInfo = { proxyInfo = {
'type': 'ssr', 'type': 'ssr',
'server': '127.0.0.1', 'server': testConfig['addr'],
'port': config['port'], 'port': testConfig['port'],
'passwd': config['passwd'], 'passwd': testConfig['passwd'],
'method': method, 'method': method,
'protocol': protocol, 'protocol': protocol,
'obfs': obfs 'obfs': obfs
} }
serverCommand = [ serverCommand = [
'ssr-server', 'ssr-server',
'-p', str(config['port']), '-s', testConfig['bind'],
'-k', config['passwd'], '-p', str(testConfig['port']),
'-k', testConfig['passwd'],
'-m', method, '-m', method,
'-O', protocol, '-O', protocol,
'-o', obfs '-o', obfs
@ -103,13 +104,16 @@ def __ssrServerConfig(method: str, protocol: str, obfs: str, caption: str) -> li
'aider': None 'aider': None
}] }]
def ssrTest(ssrConfig: dict) -> list: def test(config: dict) -> list:
result = [] global testConfig
for key, value in ssrConfig.items(): # ssrConfig -> config testConfig = config
config[key] = value
testList = []
for method in ssrMethodList: # all ShadowsocksR methods for method in ssrMethodList: # all ShadowsocksR methods
result += __ssrServerConfig(method, 'origin', 'plain', 'ShadowsocksR method ' + method) testList += __ssrServerConfig(method, 'origin', 'plain', 'ShadowsocksR method ' + method)
for protocol in ssrProtocolList: # all ShadowsocksR protocol and obfs for protocol in ssrProtocolList: # all ShadowsocksR protocol and obfs
for obfs in ssrObfsList: for obfs in ssrObfsList:
result += __ssrServerConfig('table', protocol, obfs, 'ShadowsocksR protocol ' + protocol + ' obfs ' + obfs) testList += __ssrServerConfig('table', protocol, obfs, 'ShadowsocksR protocol ' + protocol + ' obfs ' + obfs)
return result
return testList

91
ProxyTester/Trojan.py

@ -1,64 +1,66 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding:utf-8 -*- # -*- coding:utf-8 -*-
import json
import json
from ProxyTester import Xray from ProxyTester import Xray
config = {} testConfig = {}
def trojanBasicTest() -> dict: def trojanBasicTest() -> dict:
serverConfig = { serverConfig = {
'run_type': 'server', 'run_type': 'server',
'local_addr': '127.0.0.1', 'local_addr': testConfig['bind'],
'local_port': config['port'], 'local_port': testConfig['port'],
'password': [ 'password': [
config['passwd'] testConfig['passwd']
], ],
'ssl': { 'ssl': {
'cert': config['cert'], 'cert': testConfig['cert'],
'key': config['key'] 'key': testConfig['key']
} }
} }
return { return {
'caption': 'Trojan basic', 'caption': 'Trojan basic',
'proxy': { 'proxy': {
'type': 'trojan', 'type': 'trojan',
'server': '127.0.0.1', 'server': testConfig['addr'],
'port': config['port'], 'port': testConfig['port'],
'passwd': config['passwd'], 'passwd': testConfig['passwd'],
'stream': { 'stream': {
'type': 'tcp', 'type': 'tcp',
'secure': { 'secure': {
'type': 'tls', 'type': 'tls',
'sni': config['host'] 'sni': testConfig['host']
} }
} }
}, },
'server': { 'server': {
'startCommand': ['trojan', '-c', config['file']], 'startCommand': ['trojan', '-c', testConfig['file']],
'fileContent': json.dumps(serverConfig), 'fileContent': json.dumps(serverConfig),
'filePath': config['file'], 'filePath': testConfig['file'],
'envVar': {} 'envVar': {}
}, },
'aider': None 'aider': None
} }
def loadTrojanStream(streamInfo: dict, xtlsFlow: str or None) -> dict: def loadTrojanStream(streamInfo: dict, xtlsFlow: str or None) -> dict:
proxyInfo = { proxyInfo = {
'type': 'trojan', 'type': 'trojan',
'server': '127.0.0.1', 'server': testConfig['addr'],
'port': config['port'], 'port': testConfig['port'],
'passwd': config['passwd'], 'passwd': testConfig['passwd'],
'stream': streamInfo['client'] 'stream': streamInfo['client']
} }
inboundConfig = { inboundConfig = {
'protocol': 'trojan', 'protocol': 'trojan',
'listen': '127.0.0.1', 'listen': testConfig['bind'],
'port': config['port'], 'port': testConfig['port'],
'settings': { 'settings': {
'clients': [ 'clients': [
{ {
'password': config['passwd'] 'password': testConfig['passwd']
} }
] ]
}, },
@ -70,79 +72,78 @@ def loadTrojanStream(streamInfo: dict, xtlsFlow: str or None) -> dict:
'caption': 'Trojan network ' + streamInfo['caption'], 'caption': 'Trojan network ' + streamInfo['caption'],
'proxy': proxyInfo, 'proxy': proxyInfo,
'server': { 'server': {
'startCommand': ['xray', '-c', config['file']], 'startCommand': ['xray', '-c', testConfig['file']],
'fileContent': Xray.xrayConfig(inboundConfig), 'fileContent': Xray.xrayConfig(inboundConfig),
'filePath': config['file'], 'filePath': testConfig['file'],
'envVar': {} 'envVar': {}
}, },
'aider': None 'aider': None
} }
def trojanTest(trojanConfig: dict) -> list:
result = []
for key, value in trojanConfig.items(): # trojanConfig -> config
config[key] = value
result.append(trojanBasicTest()) # basic test def test(config: dict) -> list:
global testConfig
testConfig = config
testList = [trojanBasicTest()] # basic test
# TCP stream # TCP stream
streamInfo = Xray.loadTcpStream(False, '', '') streamInfo = Xray.loadTcpStream(False, '', '')
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
for flow in Xray.xtlsFlowList: for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadTcpStream(False, '', '') streamInfo = Xray.loadTcpStream(False, '', '')
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow) xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
result.append(loadTrojanStream(streamInfo, xtlsFlow)) testList.append(loadTrojanStream(streamInfo, xtlsFlow))
streamInfo = Xray.loadTcpStream(True, config['host'], '/') streamInfo = Xray.loadTcpStream(True, config['host'], '/')
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
# mKCP stream # mKCP stream
for obfs in Xray.udpObfsList: for obfs in Xray.udpObfsList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs) streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
for flow in Xray.xtlsFlowList: for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs) streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow) xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
result.append(loadTrojanStream(streamInfo, xtlsFlow)) testList.append(loadTrojanStream(streamInfo, xtlsFlow))
# WebSocket stream # WebSocket stream
streamInfo = Xray.loadWsStream(config['host'], config['path'], False) streamInfo = Xray.loadWsStream(config['host'], config['path'], False)
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.loadWsStream(config['host'], config['path'], True) streamInfo = Xray.loadWsStream(config['host'], config['path'], True)
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
# HTTP/2 stream # HTTP/2 stream
streamInfo = Xray.loadH2Stream(config['host'], config['path']) streamInfo = Xray.loadH2Stream(config['host'], config['path'])
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
# QUIC stream # QUIC stream
for method in Xray.quicMethodList: for method in Xray.quicMethodList:
for obfs in Xray.udpObfsList: for obfs in Xray.udpObfsList:
streamInfo = Xray.loadQuicStream(method, config['passwd'], obfs) streamInfo = Xray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
# GRPC stream # GRPC stream
streamInfo = Xray.loadGrpcStream(config['service']) streamInfo = Xray.loadGrpcStream(config['service'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.loadGrpcStream(config['service'], multiMode = True) streamInfo = Xray.loadGrpcStream(config['service'], multiMode = True)
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadTrojanStream(streamInfo, None)) testList.append(loadTrojanStream(streamInfo, None))
return result return testList

67
ProxyTester/TrojanGo.py

@ -3,10 +3,9 @@
import copy import copy
import json import json
from ProxyTester import Plugin from ProxyTester import Plugin
config = {} testConfig = {}
trojanGoMethod = [ trojanGoMethod = [
'AES-128-GCM', 'AES-128-GCM',
@ -29,51 +28,52 @@ sip003PluginList = [ # SIP003插件列表
'gun-plugin' 'gun-plugin'
] ]
def loadTrojanGo(isWs: bool, ssMethod: str or None) -> dict: def loadTrojanGo(isWs: bool, ssMethod: str or None) -> dict:
caption = 'Trojan-Go original' caption = 'Trojan-Go original'
serverConfig = { serverConfig = {
'run_type': 'server', 'run_type': 'server',
'local_addr': '127.0.0.1', 'local_addr': testConfig['bind'],
'local_port': config['port'], 'local_port': testConfig['port'],
'remote_addr': '127.0.0.1', # only for shadowsocks fallback 'remote_addr': '127.0.0.1', # only for shadowsocks fallback
'remote_port': 343, 'remote_port': 343,
'password': [ 'password': [
config['passwd'] testConfig['passwd']
], ],
'disable_http_check': True, 'disable_http_check': True,
'ssl': { 'ssl': {
'cert': config['cert'], 'cert': testConfig['cert'],
'key': config['key'] 'key': testConfig['key']
} }
} }
proxyInfo = { proxyInfo = {
'type': 'trojan-go', 'type': 'trojan-go',
'server': '127.0.0.1', 'server': testConfig['addr'],
'port': config['port'], 'port': testConfig['port'],
'passwd': config['passwd'], 'passwd': testConfig['passwd'],
'sni': config['host'], 'sni': testConfig['host'],
} }
if ssMethod is not None: # add Shadowsocks encrypt if ssMethod is not None: # add Shadowsocks encrypt
caption += ' ' + ssMethod + ' encrypt' caption += ' ' + ssMethod + ' encrypt'
serverConfig['shadowsocks'] = { serverConfig['shadowsocks'] = {
'enabled': True, 'enabled': True,
'method': ssMethod, 'method': ssMethod,
'password': config['passwd'] 'password': testConfig['passwd']
} }
proxyInfo['ss'] = { proxyInfo['ss'] = {
'method': ssMethod, 'method': ssMethod,
'passwd': config['passwd'] 'passwd': testConfig['passwd']
} }
if isWs: # add WebSocket config if isWs: # add WebSocket config
caption += ' (websocket)' caption += ' (websocket)'
serverConfig['websocket'] = { serverConfig['websocket'] = {
'enabled': True, 'enabled': True,
'host': config['host'], 'host': testConfig['host'],
'path': config['path'] 'path': testConfig['path']
} }
proxyInfo['ws'] = { proxyInfo['ws'] = {
'host': config['host'], 'host': testConfig['host'],
'path': config['path'] 'path': testConfig['path']
} }
return { return {
'caption': caption, 'caption': caption,
@ -83,6 +83,7 @@ def loadTrojanGo(isWs: bool, ssMethod: str or None) -> dict:
'path': None 'path': None
} }
def loadTrojanGoPlugin(plugin: str) -> list: def loadTrojanGoPlugin(plugin: str) -> list:
result = [] result = []
rabbitPort = 20191 rabbitPort = 20191
@ -93,7 +94,7 @@ def loadTrojanGoPlugin(plugin: str) -> list:
trojanBaseConfig['client']['port'] = rabbitPort trojanBaseConfig['client']['port'] = rabbitPort
trojanBaseConfig['client']['plugin'] = { trojanBaseConfig['client']['plugin'] = {
'type': 'rabbit-plugin', 'type': 'rabbit-plugin',
'param': 'serviceAddr=127.0.0.1:' + str(config['port']) + ';password=' + config['passwd'] 'param': 'serviceAddr=127.0.0.1:' + str(testConfig['port']) + ';password=' + testConfig['passwd']
} }
trojanBaseConfig['server']['transport_plugin'] = { trojanBaseConfig['server']['transport_plugin'] = {
'enabled': True, 'enabled': True,
@ -101,7 +102,7 @@ def loadTrojanGoPlugin(plugin: str) -> list:
'command': 'rabbit', 'command': 'rabbit',
'arg': [ 'arg': [
'-mode', 's', '-mode', 's',
'-password', config['passwd'], '-password', testConfig['passwd'],
'-rabbit-addr', ':' + str(rabbitPort) '-rabbit-addr', ':' + str(rabbitPort)
] ]
} }
@ -110,7 +111,7 @@ def loadTrojanGoPlugin(plugin: str) -> list:
return [trojanBaseConfig] return [trojanBaseConfig]
# other plugin # other plugin
pluginConfig = Plugin.loadPluginConfig(plugin, config['host'], config['cert'], config['key']) # 载入插件配置 pluginConfig = Plugin.loadPluginConfig(plugin, testConfig['host'], testConfig['cert'], testConfig['key']) # 载入插件配置
for pluginOption in pluginConfig: for pluginOption in pluginConfig:
trojanConfig = copy.deepcopy(trojanBaseConfig) trojanConfig = copy.deepcopy(trojanBaseConfig)
trojanConfig['caption'] = 'Trojan-Go plugin ' + plugin + ' (' + pluginOption['caption'] + ')' trojanConfig['caption'] = 'Trojan-Go plugin ' + plugin + ' (' + pluginOption['caption'] + ')'
@ -126,6 +127,7 @@ def loadTrojanGoPlugin(plugin: str) -> list:
result.append(trojanConfig) result.append(trojanConfig)
return result return result
def loadTrojanGoConfig(trojanGoConfigList: list) -> list: def loadTrojanGoConfig(trojanGoConfigList: list) -> list:
result = [] result = []
for trojanGoConfig in trojanGoConfigList: for trojanGoConfig in trojanGoConfigList:
@ -133,9 +135,9 @@ def loadTrojanGoConfig(trojanGoConfigList: list) -> list:
'caption': trojanGoConfig['caption'], 'caption': trojanGoConfig['caption'],
'proxy': trojanGoConfig['client'], 'proxy': trojanGoConfig['client'],
'server': { 'server': {
'startCommand': ['trojan-go', '-config', config['file']], 'startCommand': ['trojan-go', '-config', testConfig['file']],
'fileContent': json.dumps(trojanGoConfig['server']), 'fileContent': json.dumps(trojanGoConfig['server']),
'filePath': config['file'], 'filePath': testConfig['file'],
'envVar': {'PATH': '/usr/bin'} 'envVar': {'PATH': '/usr/bin'}
}, },
'aider': { 'aider': {
@ -147,18 +149,19 @@ def loadTrojanGoConfig(trojanGoConfigList: list) -> list:
}) })
return result return result
def trojanGoTest(trojanGoConfig: dict) -> list:
result = []
for key, value in trojanGoConfig.items(): # trojanGoConfig -> config
config[key] = value
result += loadTrojanGoConfig([loadTrojanGo(False, None)]) # basic test def test(config: dict) -> list:
result += loadTrojanGoConfig([loadTrojanGo(True, None)]) global testConfig
testConfig = config
testList = []
testList += loadTrojanGoConfig([loadTrojanGo(False, None)]) # basic test
testList += loadTrojanGoConfig([loadTrojanGo(True, None)])
for ssMethod in trojanGoMethod: for ssMethod in trojanGoMethod:
result += loadTrojanGoConfig([loadTrojanGo(False, ssMethod)]) # basic test with shadowsocks testList += loadTrojanGoConfig([loadTrojanGo(False, ssMethod)]) # basic test with shadowsocks
result += loadTrojanGoConfig([loadTrojanGo(True, ssMethod)]) testList += loadTrojanGoConfig([loadTrojanGo(True, ssMethod)])
for plugin in sip003PluginList: # plugin test -> cause zombie process (imperfect trojan-go) for plugin in sip003PluginList: # plugin test -> cause zombie process (imperfect trojan-go)
result += loadTrojanGoConfig(loadTrojanGoPlugin(plugin)) testList += loadTrojanGoConfig(loadTrojanGoPlugin(plugin))
return result return testList

8
ProxyTester/V2ray.py

@ -48,6 +48,7 @@ quicMethodList = [
'chacha20-poly1305', 'chacha20-poly1305',
] ]
def loadTcpStream(isObfs: bool, host: str, path: str) -> dict: def loadTcpStream(isObfs: bool, host: str, path: str) -> dict:
streamConfig = { streamConfig = {
'network': 'tcp', 'network': 'tcp',
@ -84,6 +85,7 @@ def loadTcpStream(isObfs: bool, host: str, path: str) -> dict:
'server': streamConfig 'server': streamConfig
} }
def loadKcpStream(seed: str, obfs: str) -> dict: def loadKcpStream(seed: str, obfs: str) -> dict:
kcpSetting['header'] = { kcpSetting['header'] = {
'type': obfs 'type': obfs
@ -103,6 +105,7 @@ def loadKcpStream(seed: str, obfs: str) -> dict:
} }
} }
def loadWsStream(host: str, path: str, isEd: bool) -> dict: def loadWsStream(host: str, path: str, isEd: bool) -> dict:
wsSetting = { wsSetting = {
'path': path, 'path': path,
@ -141,6 +144,7 @@ def loadWsStream(host: str, path: str, isEd: bool) -> dict:
} }
} }
def loadH2Stream(host: str, path: str) -> dict: def loadH2Stream(host: str, path: str) -> dict:
return { return {
'caption': 'HTTP/2', 'caption': 'HTTP/2',
@ -159,6 +163,7 @@ def loadH2Stream(host: str, path: str) -> dict:
} }
} }
def loadQuicStream(method: str, passwd: str, obfs: str) -> dict: def loadQuicStream(method: str, passwd: str, obfs: str) -> dict:
return { return {
'caption': 'QUIC method ' + method + ' obfs ' + obfs, 'caption': 'QUIC method ' + method + ' obfs ' + obfs,
@ -181,6 +186,7 @@ def loadQuicStream(method: str, passwd: str, obfs: str) -> dict:
} }
} }
def loadGrpcStream(service: str, multiMode: bool = False) -> dict: def loadGrpcStream(service: str, multiMode: bool = False) -> dict:
if not multiMode: if not multiMode:
return { return {
@ -214,6 +220,7 @@ def loadGrpcStream(service: str, multiMode: bool = False) -> dict:
} }
} }
def addSecureConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict: def addSecureConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict:
streamInfo = copy.deepcopy(rawStreamInfo) streamInfo = copy.deepcopy(rawStreamInfo)
streamInfo['caption'] += ' (tls)' streamInfo['caption'] += ' (tls)'
@ -235,6 +242,7 @@ def addSecureConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict:
} }
return streamInfo return streamInfo
def v2rayConfig(inboundConfig: dict) -> str: def v2rayConfig(inboundConfig: dict) -> str:
return json.dumps({ return json.dumps({
'log': { 'log': {

66
ProxyTester/VLESS.py

@ -3,24 +3,25 @@
from ProxyTester import Xray from ProxyTester import Xray
config = {} testConfig = {}
def loadVlessStream(streamInfo: dict, xtlsFlow: str or None) -> dict: def loadVlessStream(streamInfo: dict, xtlsFlow: str or None) -> dict:
proxyInfo = { proxyInfo = {
'type': 'vless', 'type': 'vless',
'server': '127.0.0.1', 'server': testConfig['addr'],
'port': config['port'], 'port': testConfig['port'],
'id': config['id'], 'id': testConfig['id'],
'stream': streamInfo['client'] 'stream': streamInfo['client']
} }
inboundConfig = { inboundConfig = {
'protocol': 'vless', 'protocol': 'vless',
'listen': '127.0.0.1', 'listen': testConfig['bind'],
'port': config['port'], 'port': testConfig['port'],
'settings': { 'settings': {
'clients': [ 'clients': [
{ {
'id': config['id'] 'id': testConfig['id']
} }
], ],
'decryption': 'none' 'decryption': 'none'
@ -33,77 +34,78 @@ def loadVlessStream(streamInfo: dict, xtlsFlow: str or None) -> dict:
'caption': 'VLESS network ' + streamInfo['caption'], 'caption': 'VLESS network ' + streamInfo['caption'],
'proxy': proxyInfo, 'proxy': proxyInfo,
'server': { 'server': {
'startCommand': ['xray', '-c', config['file']], 'startCommand': ['xray', '-c', testConfig['file']],
'fileContent': Xray.xrayConfig(inboundConfig), 'fileContent': Xray.xrayConfig(inboundConfig),
'filePath': config['file'], 'filePath': testConfig['file'],
'envVar': {} 'envVar': {}
}, },
'aider': None 'aider': None
} }
def vlessTest(vlessConfig: dict) -> list:
result = [] def test(config: dict) -> list:
for key, value in vlessConfig.items(): # vlessConfig -> config global testConfig
config[key] = value testConfig = config
testList = []
# TCP stream # TCP stream
streamInfo = Xray.loadTcpStream(False, '', '') streamInfo = Xray.loadTcpStream(False, '', '')
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
for flow in Xray.xtlsFlowList: for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadTcpStream(False, '', '') streamInfo = Xray.loadTcpStream(False, '', '')
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow) xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
result.append(loadVlessStream(streamInfo, xtlsFlow)) testList.append(loadVlessStream(streamInfo, xtlsFlow))
streamInfo = Xray.loadTcpStream(True, config['host'], '/') streamInfo = Xray.loadTcpStream(True, config['host'], '/')
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
# mKCP stream # mKCP stream
for obfs in Xray.udpObfsList: for obfs in Xray.udpObfsList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs) streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
for flow in Xray.xtlsFlowList: for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs) streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow) xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
result.append(loadVlessStream(streamInfo, xtlsFlow)) testList.append(loadVlessStream(streamInfo, xtlsFlow))
# WebSocket stream # WebSocket stream
streamInfo = Xray.loadWsStream(config['host'], config['path'], False) streamInfo = Xray.loadWsStream(config['host'], config['path'], False)
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.loadWsStream(config['host'], config['path'], True) streamInfo = Xray.loadWsStream(config['host'], config['path'], True)
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
# HTTP/2 stream # HTTP/2 stream
streamInfo = Xray.loadH2Stream(config['host'], config['path']) streamInfo = Xray.loadH2Stream(config['host'], config['path'])
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
# QUIC stream # QUIC stream
for method in Xray.quicMethodList: for method in Xray.quicMethodList:
for obfs in Xray.udpObfsList: for obfs in Xray.udpObfsList:
streamInfo = Xray.loadQuicStream(method, config['passwd'], obfs) streamInfo = Xray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
# GRPC stream # GRPC stream
streamInfo = Xray.loadGrpcStream(config['service']) streamInfo = Xray.loadGrpcStream(config['service'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.loadGrpcStream(config['service'], multiMode = True) streamInfo = Xray.loadGrpcStream(config['service'], multiMode = True)
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None)) testList.append(loadVlessStream(streamInfo, None))
return result return testList

76
ProxyTester/VMess.py

@ -3,7 +3,7 @@
from ProxyTester import V2ray from ProxyTester import V2ray
config = {} testConfig = {}
vmessMethodList = [ vmessMethodList = [
'aes-128-gcm', 'aes-128-gcm',
@ -16,12 +16,12 @@ vmessMethodList = [
def vmessBasicTest(method: str, alterId: int) -> dict: def vmessBasicTest(method: str, alterId: int) -> dict:
inboundConfig = { inboundConfig = {
'protocol': 'vmess', 'protocol': 'vmess',
'listen': '127.0.0.1', 'listen': testConfig['bind'],
'port': config['port'], 'port': testConfig['port'],
'settings': { 'settings': {
'clients': [ 'clients': [
{ {
'id': config['id'], 'id': testConfig['id'],
'alterId': alterId 'alterId': alterId
} }
] ]
@ -42,16 +42,16 @@ def vmessBasicTest(method: str, alterId: int) -> dict:
'caption': caption, 'caption': caption,
'proxy': { 'proxy': {
'type': 'vmess', 'type': 'vmess',
'server': '127.0.0.1', 'server': testConfig['addr'],
'port': config['port'], 'port': testConfig['port'],
'method': method, 'method': method,
'id': config['id'], 'id': testConfig['id'],
'aid': alterId 'aid': alterId
}, },
'server': { 'server': {
'startCommand': ['v2ray', '-c', config['file']], 'startCommand': ['v2ray', '-c', testConfig['file']],
'fileContent': V2ray.v2rayConfig(inboundConfig), 'fileContent': V2ray.v2rayConfig(inboundConfig),
'filePath': config['file'], 'filePath': testConfig['file'],
'envVar': envVar 'envVar': envVar
}, },
'aider': None 'aider': None
@ -60,19 +60,19 @@ def vmessBasicTest(method: str, alterId: int) -> dict:
def loadVmessStream(streamInfo: dict) -> dict: def loadVmessStream(streamInfo: dict) -> dict:
proxyInfo = { proxyInfo = {
'type': 'vmess', 'type': 'vmess',
'server': '127.0.0.1', 'server': testConfig['addr'],
'port': config['port'], 'port': testConfig['port'],
'id': config['id'], 'id': testConfig['id'],
'stream': streamInfo['client'] 'stream': streamInfo['client']
} }
inboundConfig = { inboundConfig = {
'protocol': 'vmess', 'protocol': 'vmess',
'listen': '127.0.0.1', 'listen': testConfig['bind'],
'port': config['port'], 'port': testConfig['port'],
'settings': { 'settings': {
'clients': [ 'clients': [
{ {
'id': config['id'] 'id': testConfig['id']
} }
] ]
}, },
@ -82,69 +82,69 @@ def loadVmessStream(streamInfo: dict) -> dict:
'caption': 'VMess network ' + streamInfo['caption'], 'caption': 'VMess network ' + streamInfo['caption'],
'proxy': proxyInfo, 'proxy': proxyInfo,
'server': { 'server': {
'startCommand': ['v2ray', '-c', config['file']], 'startCommand': ['v2ray', '-c', testConfig['file']],
'fileContent': V2ray.v2rayConfig(inboundConfig), 'fileContent': V2ray.v2rayConfig(inboundConfig),
'filePath': config['file'], 'filePath': testConfig['file'],
'envVar': {} 'envVar': {}
}, },
'aider': None 'aider': None
} }
def vmessTest(vmessConfig: dict) -> list: def test(config: dict) -> list:
result = [] global testConfig
for key, value in vmessConfig.items(): # vmessConfig -> config testConfig = config
config[key] = value testList = []
# Basic test # Basic test
for method in vmessMethodList: # methods and AEAD/MD5+AES test for method in vmessMethodList: # methods and AEAD/MD5+AES test
result.append(vmessBasicTest(method, 0)) testList.append(vmessBasicTest(method, 0))
result.append(vmessBasicTest(method, 64)) testList.append(vmessBasicTest(method, 64))
# TCP stream # TCP stream
streamInfo = V2ray.loadTcpStream(False, '', '') streamInfo = V2ray.loadTcpStream(False, '', '')
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.loadTcpStream(True, config['host'], '/') streamInfo = V2ray.loadTcpStream(True, config['host'], '/')
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
# mKCP stream # mKCP stream
for obfs in V2ray.udpObfsList: for obfs in V2ray.udpObfsList:
streamInfo = V2ray.loadKcpStream(config['passwd'], obfs) streamInfo = V2ray.loadKcpStream(config['passwd'], obfs)
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
# WebSocket stream # WebSocket stream
streamInfo = V2ray.loadWsStream(config['host'], config['path'], False) streamInfo = V2ray.loadWsStream(config['host'], config['path'], False)
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.loadWsStream(config['host'], config['path'], True) streamInfo = V2ray.loadWsStream(config['host'], config['path'], True)
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
# HTTP/2 stream # HTTP/2 stream
streamInfo = V2ray.loadH2Stream(config['host'], config['path']) streamInfo = V2ray.loadH2Stream(config['host'], config['path'])
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
# QUIC stream # QUIC stream
for method in V2ray.quicMethodList: for method in V2ray.quicMethodList:
for obfs in V2ray.udpObfsList: for obfs in V2ray.udpObfsList:
streamInfo = V2ray.loadQuicStream(method, config['passwd'], obfs) streamInfo = V2ray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
# GRPC stream # GRPC stream
streamInfo = V2ray.loadGrpcStream(config['service']) streamInfo = V2ray.loadGrpcStream(config['service'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host']) streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVmessStream(streamInfo)) testList.append(loadVmessStream(streamInfo))
return result return testList

3
ProxyTester/Xray.py

@ -20,6 +20,7 @@ loadH2Stream = V2ray.loadH2Stream
loadQuicStream = V2ray.loadQuicStream loadQuicStream = V2ray.loadQuicStream
loadGrpcStream = V2ray.loadGrpcStream loadGrpcStream = V2ray.loadGrpcStream
def loadWsStream(host: str, path: str, isEd: bool) -> dict: def loadWsStream(host: str, path: str, isEd: bool) -> dict:
if not isEd: # without Early-Data if not isEd: # without Early-Data
return { return {
@ -60,6 +61,7 @@ def loadWsStream(host: str, path: str, isEd: bool) -> dict:
} }
} }
def addTlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict: def addTlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict:
streamInfo = copy.deepcopy(rawStreamInfo) streamInfo = copy.deepcopy(rawStreamInfo)
streamInfo['caption'] += ' (tls)' streamInfo['caption'] += ' (tls)'
@ -82,6 +84,7 @@ def addTlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict:
} }
return streamInfo return streamInfo
def addXtlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str, xtlsFlow: str) -> tuple[str, dict]: def addXtlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str, xtlsFlow: str) -> tuple[str, dict]:
streamInfo = copy.deepcopy(rawStreamInfo) streamInfo = copy.deepcopy(rawStreamInfo)
streamInfo['caption'] += ' (' + xtlsFlow + ')' streamInfo['caption'] += ' (' + xtlsFlow + ')'

15
ProxyTester/tester.py

@ -11,18 +11,19 @@ from ProxyTester import Brook
def test(key: str, config: dict) -> list: def test(key: str, config: dict) -> list:
if key in ['ss', 'shadowsocks']: if key in ['ss', 'shadowsocks']:
return Shadowsocks.ssTest(config) testObj = Shadowsocks
elif key in ['ssr', 'shadowsocksr']: elif key in ['ssr', 'shadowsocksr']:
return ShadowsocksR.ssrTest(config) testObj = ShadowsocksR
elif key == 'vmess': elif key == 'vmess':
return VMess.vmessTest(config) testObj = VMess
elif key == 'vless': elif key == 'vless':
return VLESS.vlessTest(config) testObj = VLESS
elif key == 'trojan': elif key == 'trojan':
return Trojan.trojanTest(config) testObj = Trojan
elif key == 'trojan-go': elif key == 'trojan-go':
return TrojanGo.trojanGoTest(config) testObj = TrojanGo
elif key == 'brook': elif key == 'brook':
return Brook.test(config) testObj = Brook
else: else:
return [] return []
return testObj.test(config)

6
Test.py

@ -11,17 +11,17 @@ import Check as Checker
import ProxyTester as Tester import ProxyTester as Tester
testConfig = { testConfig = {
'bind': '0.0.0.0',
'addr': '172.17.0.2',
'port': 12345, 'port': 12345,
'passwd': 'dnomd343', 'passwd': 'dnomd343',
'host': 'local.343.re', 'host': 'local.343.re',
'path': '/test', 'path': '/test',
'bind': '::1',
# 'bind': '127.0.0.1',
'service': 'dnomd343', 'service': 'dnomd343',
'file': '/tmp/proxycTest.json', 'file': '/tmp/proxycTest.json',
'id': '1f7aa040-94d8-4b53-ae85-af6946d550bb',
'cert': '/etc/ssl/certs/343.re/fullchain.pem', 'cert': '/etc/ssl/certs/343.re/fullchain.pem',
'key': '/etc/ssl/certs/343.re/privkey.pem', 'key': '/etc/ssl/certs/343.re/privkey.pem',
'id': '1f7aa040-94d8-4b53-ae85-af6946d550bb',
} }
def testBuild(config: dict): # load file and start process def testBuild(config: dict): # load file and start process

Loading…
Cancel
Save