Browse Source

test: remove legacy process

master
dnomd343 2 years ago
parent
commit
6cd3bd6fe0
  1. 98
      ProxyTester/Brook.py
  2. 69
      ProxyTester/Hysteria.py
  3. 288
      ProxyTester/Plugin.py
  4. 209
      ProxyTester/Shadowsocks.py
  5. 119
      ProxyTester/ShadowsocksR.py
  6. 149
      ProxyTester/Trojan.py
  7. 169
      ProxyTester/TrojanGo.py
  8. 257
      ProxyTester/V2ray.py
  9. 111
      ProxyTester/VLESS.py
  10. 150
      ProxyTester/VMess.py
  11. 116
      ProxyTester/Xray.py
  12. 6
      ProxyTester/__init__.py
  13. 32
      ProxyTester/tester.py
  14. 93
      Test.py

98
ProxyTester/Brook.py

@ -1,98 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
testConfig = {}
def __originConfig() -> dict:
return {
'caption': 'Brook original',
'client': {
'type': 'brook',
'server': testConfig['addr'],
'port': testConfig['port'],
'passwd': testConfig['passwd']
},
'server': [
'brook', 'server',
'--listen', testConfig['bind'] + ':' + str(testConfig['port']),
'--password', testConfig['passwd']
]
}
def __wsConfig() -> dict:
return {
'caption': 'Brook websocket',
'client': {
'type': 'brook',
'server': testConfig['addr'],
'port': testConfig['port'],
'passwd': testConfig['passwd'],
'ws': {
'host': testConfig['host'],
'path': testConfig['path']
}
},
'server': [
'brook', 'wsserver',
'--listen', testConfig['bind'] + ':' + str(testConfig['port']),
'--password', testConfig['passwd'],
'--path', testConfig['path']
]
}
def __wssConfig() -> dict:
return {
'caption': 'Brook websocket with TLS',
'client': {
'type': 'brook',
'server': testConfig['addr'],
'port': testConfig['port'],
'passwd': testConfig['passwd'],
'ws': {
'host': testConfig['host'],
'path': testConfig['path'],
'secure': {
'verify': True
}
}
},
'server': [
'brook', 'wssserver',
'--domainaddress', testConfig['bind'] + ':' + str(testConfig['port']),
'--cert', testConfig['cert'],
'--certkey', testConfig['key'],
'--password', testConfig['passwd'],
'--path', testConfig['path']
]
}
def __brookConfig(brookConfig: dict) -> dict:
return {
'caption': brookConfig['caption'],
'proxy': brookConfig['client'],
'server': {
'startCommand': brookConfig['server'],
'fileContent': None,
'filePath': None,
'envVar': {}
},
'aider': None
}
def test(config: dict) -> list:
global testConfig
testConfig = copy.deepcopy(config)
if testConfig['bind'].find(':') >= 0:
testConfig['bind'] = '[' + testConfig['bind'] + ']'
return [
__brookConfig(__originConfig()),
__brookConfig(__wsConfig()),
__brookConfig(__wssConfig()),
]

69
ProxyTester/Hysteria.py

@ -1,69 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
import json
testConfig = {}
hysteriaProtocolList = [
'udp',
'wechat-video',
'faketcp',
]
def __hysteriaConfig(protocol: str, isObfs: bool, isAuth: bool) -> dict:
caption = 'Hysteria protocol ' + protocol
proxyInfo = {
'type': 'hysteria',
'server': testConfig['addr'],
'port': testConfig['port'],
'protocol': protocol,
'sni': testConfig['host']
}
serverConfig = {
'listen': testConfig['bind'] + ':' + str(testConfig['port']),
'protocol': protocol,
'cert': testConfig['cert'],
'key': testConfig['key'],
}
if isObfs:
caption += ' (with obfs)'
proxyInfo['obfs'] = testConfig['passwd']
serverConfig['obfs'] = testConfig['passwd']
if isAuth:
caption += ' (with auth)'
proxyInfo['auth'] = testConfig['passwd']
serverConfig['auth'] = {
'mode': 'passwords',
'config': [testConfig['passwd']]
}
return {
'caption': caption,
'proxy': proxyInfo,
'server': {
'startCommand': ['hysteria', '-c', testConfig['file'], 'server'],
'fileContent': json.dumps(serverConfig),
'filePath': testConfig['file'],
'envVar': {}
},
'aider': None
}
def test(config: dict) -> list:
global testConfig
testConfig = copy.deepcopy(config)
if testConfig['bind'].find(':') >= 0:
testConfig['bind'] = '[' + testConfig['bind'] + ']'
testList = []
for protocol in hysteriaProtocolList:
testList.append(__hysteriaConfig(protocol, False, False))
testList.append(__hysteriaConfig(protocol, False, True))
testList.append(__hysteriaConfig(protocol, True, False))
testList.append(__hysteriaConfig(protocol, True, True))
return testList

288
ProxyTester/Plugin.py

@ -1,288 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import re
import json
gqConfig = {
'key': 'dnomd343'
}
ckConfig = {
'BypassUID': [
'Q3iw2bAbC3KZvpm58XR6+Q=='
],
'RedirAddr': 'www.bing.com',
'PrivateKey': 'SFMUZ2g7e0jqzXXhBh5/rh/Odslnyu8A3LuZqH4ySVM='
}
pluginConfig = {
'obfs-local': [
{
'caption': 'http mode',
'server': '--plugin obfs-server --plugin-opts "obfs=http"',
'client': '--plugin obfs-local --plugin-opts "obfs=http;obfs-host=www.bing.com"',
},
{
'caption': 'tls mode',
'server': '--plugin obfs-server --plugin-opts "obfs=tls"',
'client': '--plugin obfs-local --plugin-opts "obfs=tls;obfs-host=www.bing.com"',
},
{
'caption': 'http mode with URI',
'server': '--plugin obfs-server --plugin-opts "obfs=http"',
'client': '--plugin obfs-local --plugin-opts "obfs=http;obfs-host=www.bing.com;obfs-uri=/test"',
},
{
'caption': 'http mode with POST method',
'server': '--plugin obfs-server --plugin-opts "obfs=http"',
'client': '--plugin obfs-local --plugin-opts "obfs=http;http-method=POST;obfs-host=www.bing.com"',
}
],
'simple-tls': [
{
'caption': 'basic mode',
'server': '--plugin simple-tls --plugin-opts "s;n=$HOST"',
'client': '--plugin simple-tls --plugin-opts "n=$HOST;no-verify"',
},
{
'caption': 'with auth key',
'server': '--plugin simple-tls --plugin-opts "s;n=$HOST;auth=dnomd343"',
'client': '--plugin simple-tls --plugin-opts "n=$HOST;no-verify;auth=dnomd343"',
},
{
'caption': 'wss mode',
'server': '--plugin simple-tls --plugin-opts "s;n=$HOST;ws;ws-path=/test"',
'client': '--plugin simple-tls --plugin-opts "n=$HOST;no-verify;ws;ws-path=/test"',
}
],
'v2ray-plugin': [
{
'caption': 'basic mode',
'server': '--plugin v2ray-plugin --plugin-opts "server"',
'client': '--plugin v2ray-plugin',
},
{
'caption': 'basic with path',
'server': '--plugin v2ray-plugin --plugin-opts "server;path=/test"',
'client': '--plugin v2ray-plugin --plugin-opts "path=/test"',
},
{
'caption': 'basic mode with tls',
'server': '--plugin v2ray-plugin --plugin-opts "server;tls;host=$HOST;cert=$CERT;key=$KEY"',
'client': '--plugin v2ray-plugin --plugin-opts "tls;host=$HOST"',
},
{
'caption': 'quic mode',
'server': '--plugin v2ray-plugin --plugin-opts "server;mode=quic;host=$HOST;cert=$CERT;key=$KEY"',
'client': '--plugin v2ray-plugin --plugin-opts "mode=quic;host=$HOST"',
}
],
'xray-plugin': [
{
'caption': 'basic mode',
'server': '--plugin xray-plugin --plugin-opts "server"',
'client': '--plugin xray-plugin',
},
{
'caption': 'basic mode with tls',
'server': '--plugin xray-plugin --plugin-opts "server;tls;host=$HOST;cert=$CERT;key=$KEY"',
'client': '--plugin xray-plugin --plugin-opts "tls;host=$HOST"',
},
{
'caption': 'quic mode',
'server': '--plugin xray-plugin --plugin-opts "server;mode=quic;host=$HOST;cert=$CERT;key=$KEY"',
'client': '--plugin xray-plugin --plugin-opts "mode=quic;host=$HOST"',
},
{
'caption': 'grpc mode',
'server': '--plugin xray-plugin --plugin-opts "server;mode=grpc;host=$HOST;cert=$CERT;key=$KEY"',
'client': '--plugin xray-plugin --plugin-opts "mode=grpc"',
},
{
'caption': 'grpc mode with tls',
'server': '--plugin xray-plugin --plugin-opts "server;tls;mode=grpc;host=$HOST;cert=$CERT;key=$KEY"',
'client': '--plugin xray-plugin --plugin-opts "tls;mode=grpc;host=$HOST"',
}
],
'kcptun-client': [
{
'caption': 'basic mode',
'server': '--plugin kcptun-server',
'client': '--plugin kcptun-client',
},
{
'caption': 'with nocomp',
'server': '--plugin kcptun-server --plugin-opts "nocomp"',
'client': '--plugin kcptun-client --plugin-opts "nocomp"',
},
{
'caption': 'with key',
'server': '--plugin kcptun-server --plugin-opts "key=dnomd343"',
'client': '--plugin kcptun-client --plugin-opts "key=dnomd343"',
},
{
'caption': 'fast3 mode with twofish crypt',
'server': '--plugin kcptun-server --plugin-opts "crypt=twofish;mode=fast3"',
'client': '--plugin kcptun-client --plugin-opts "crypt=twofish;mode=fast3"',
}
],
'gost-plugin': [
{
'caption': 'ws mode',
'server': '--plugin gost-plugin --plugin-opts "server;mode=ws"',
'client': '--plugin gost-plugin --plugin-opts "mode=ws"',
},
{
'caption': 'mws mode',
'server': '--plugin gost-plugin --plugin-opts "server;mode=mws"',
'client': '--plugin gost-plugin --plugin-opts "mode=mws;mux=1"',
},
{
'caption': 'tls mode',
'server': '--plugin gost-plugin --plugin-opts "server;cert=$CERT;key=$KEY;mode=tls"',
'client': '--plugin gost-plugin --plugin-opts "serverName=$HOST;mode=tls"',
},
{
'caption': 'xtls mode',
'server': '--plugin gost-plugin --plugin-opts "server;cert=$CERT;key=$KEY;mode=xtls"',
'client': '--plugin gost-plugin --plugin-opts "serverName=$HOST;mode=xtls"',
},
{
'caption': 'mtls mode',
'server': '--plugin gost-plugin --plugin-opts "server;cert=$CERT;key=$KEY;mode=mtls"',
'client': '--plugin gost-plugin --plugin-opts "serverName=$HOST;mode=mtls;mux=1"',
},
{
'caption': 'h2 mode',
'server': '--plugin gost-plugin --plugin-opts "server;cert=$CERT;key=$KEY;mode=h2"',
'client': '--plugin gost-plugin --plugin-opts "serverName=$HOST;mode=h2"',
},
{
'caption': 'wss mode',
'server': '--plugin gost-plugin --plugin-opts "server;cert=$CERT;key=$KEY;mode=wss"',
'client': '--plugin gost-plugin --plugin-opts "serverName=$HOST;mode=wss"',
},
{
'caption': 'mwss mode',
'server': '--plugin gost-plugin --plugin-opts "server;cert=$CERT;key=$KEY;mode=mwss"',
'client': '--plugin gost-plugin --plugin-opts "serverName=$HOST;mode=mwss;mux=1"',
},
{
'caption': 'quic mode',
'server': '--plugin gost-plugin --plugin-opts "server;cert=$CERT;key=$KEY;mode=quic"',
'client': '--plugin gost-plugin --plugin-opts "serverName=$HOST;mode=quic"',
},
{
'caption': 'grpc mode',
'server': '--plugin gost-plugin --plugin-opts "server;cert=$CERT;key=$KEY;mode=grpc"',
'client': '--plugin gost-plugin --plugin-opts "serverName=$HOST;mode=grpc"',
}
],
'ck-client': [
{
'caption': 'basic mode',
'server': '--plugin ck-server --plugin-opts "$CONFIG"',
'client': '--plugin ck-client --plugin-opts "UID=Q3iw2bAbC3KZvpm58XR6+Q==;'
'PublicKey=xTbqKW4Sg/xjDXDhys26ChXUQSrgxO+mBflTUeQpfWQ=;'
'ServerName=www.bing.com;BrowserSig=chrome;'
'NumConn=4;EncryptionMethod=plain;StreamTimeout=300"',
}
],
'gq-client': [
{
'caption': 'basic mode',
'server': '--plugin gq-server --plugin-opts "$CONFIG"',
'client': '--plugin gq-client --plugin-opts "ServerName=www.bing.com;'
'key=dnomd343;TicketTimeHint=300;Browser=chrome"',
}
],
'mtt-client': [
{
'caption': 'basic mode',
'server': '--plugin mtt-server --plugin-opts "cert=$CERT;key=$KEY"',
'client': '--plugin mtt-client --plugin-opts "n=$HOST"',
},
{
'caption': 'wss mode',
'server': '--plugin mtt-server --plugin-opts "wss;cert=$CERT;key=$KEY"',
'client': '--plugin mtt-client --plugin-opts "wss;n=$HOST"',
},
{
'caption': 'wss mode with path',
'server': '--plugin mtt-server --plugin-opts "wss;wss-path=/test;cert=$CERT;key=$KEY"',
'client': '--plugin mtt-client --plugin-opts "wss;wss-path=/test;n=$HOST"',
}
],
'qtun-client': [
{
'caption': 'basic mode',
'server': '--plugin qtun-server --plugin-opts "cert=$CERT;key=$KEY"',
'client': '--plugin qtun-client --plugin-opts "host=$HOST"',
}
],
'gun-plugin': [
{
'caption': 'basic mode',
'server': '--plugin gun-plugin --plugin-opts "server:cleartext"',
'client': '--plugin gun-plugin --plugin-opts "client:cleartext"',
},
{
'caption': 'basic mode with tls',
'server': '--plugin gun-plugin --plugin-opts "server:$CERT:$KEY"',
'client': '--plugin gun-plugin --plugin-opts "client:$HOST"',
}
]
}
def loadPluginConfig(plugin: str, host: str, cert: str, key: str) -> list: # without rabbit-tcp
result = []
filePath = None
fileContent = None
if plugin == 'ck-client': # Cloak
filePath = '/tmp/ck-test.json'
fileContent = json.dumps(ckConfig)
if plugin == 'gq-client': # GoQuiet
filePath = '/tmp/gq-test.json'
fileContent = json.dumps(gqConfig)
for pluginInfo in pluginConfig[plugin]:
pluginInfo['server'] = pluginInfo['server'].replace('$HOST', host).replace('$CERT', cert).replace('$KEY', key)
pluginInfo['client'] = pluginInfo['client'].replace('$HOST', host).replace('$CERT', cert).replace('$KEY', key)
if filePath is not None:
pluginInfo['server'] = pluginInfo['server'].replace('$CONFIG', filePath)
content = re.search(r'^--plugin ([\s\S]*) --plugin-opts "([\s\S]*)"$', pluginInfo['server'])
if content is None:
content = re.search(r'^--plugin ([\s\S]*)$', pluginInfo['server'])
serverOption = {
'type': content[1],
'param': ''
}
else:
serverOption = {
'type': content[1],
'param': content[2]
}
content = re.search(r'^--plugin ([\s\S]*) --plugin-opts "([\s\S]*)"$', pluginInfo['client'])
if content is None:
content = re.search(r'^--plugin ([\s\S]*)$', pluginInfo['client'])
clientOption = {
'type': content[1],
'param': ''
}
else:
clientOption = {
'type': content[1],
'param': content[2]
}
result.append({
'caption': pluginInfo['caption'],
'server': serverOption,
'client': clientOption,
'file': fileContent,
'path': filePath
})
return result

209
ProxyTester/Shadowsocks.py

@ -1,209 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
import ProxyTester.Plugin as sip003
testConfig = {}
ssMethodList = [
'aes-128-gcm',
'aes-192-gcm',
'aes-256-gcm',
'aes-128-ctr',
'aes-192-ctr',
'aes-256-ctr',
'aes-128-ocb',
'aes-192-ocb',
'aes-256-ocb',
'aes-128-ofb',
'aes-192-ofb',
'aes-256-ofb',
'aes-128-cfb',
'aes-192-cfb',
'aes-256-cfb',
'aes-128-cfb1',
'aes-192-cfb1',
'aes-256-cfb1',
'aes-128-cfb8',
'aes-192-cfb8',
'aes-256-cfb8',
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb',
'camellia-192-cfb',
'camellia-256-cfb',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
'plain',
'none',
'table',
'rc4',
'rc4-md5',
'rc2-cfb',
'bf-cfb',
'cast5-cfb',
'des-cfb',
'idea-cfb',
'seed-cfb',
'salsa20',
'salsa20-ctr',
'xchacha20',
'chacha20',
'chacha20-ietf',
'chacha20-poly1305',
'chacha20-ietf-poly1305',
'xchacha20-ietf-poly1305'
]
sip003PluginList = [ # SIP003插件列表
'obfs-local',
'simple-tls',
'v2ray-plugin',
'xray-plugin',
'kcptun-client',
'gost-plugin',
'ck-client',
'gq-client',
'mtt-client',
'rabbit-plugin',
'qtun-client',
'gun-plugin'
]
def __ssServerConfig(method: str, plugin: str or None) -> list:
rabbitPort = 20191
proxyInfo = {
'type': 'ss',
'server': testConfig['addr'],
'port': testConfig['port'],
'passwd': testConfig['passwd'],
'method': method
}
caption = 'Shadowsocks method ' + method
if method in ['plain', 'none']: # plain / none -> ss-rust
serverAddr = testConfig['bind']
if not serverAddr.find(':') < 0:
serverAddr = '[' + serverAddr + ']'
serverCommand = [
'ss-rust-server',
'-s', serverAddr + ':' + str(testConfig['port']),
'-k', testConfig['passwd'],
'-m', method
]
elif method == 'salsa20-ctr': # salsa20-ctr -> ss-python-legacy
serverCommand = [
'ss-bootstrap-server', '--no-udp',
'--shadowsocks', 'ss-python-legacy-server',
'-s', testConfig['bind'],
'-p', str(testConfig['port']),
'-k', testConfig['passwd'],
'-m', method
]
else: # others -> ss-python
mbedtlsMethods = [
'aes-128-cfb128',
'aes-192-cfb128',
'aes-256-cfb128',
'camellia-128-cfb128',
'camellia-192-cfb128',
'camellia-256-cfb128',
]
if method in mbedtlsMethods:
method = 'mbedtls:' + method
serverCommand = [
'ss-bootstrap-server', '--no-udp',
'--shadowsocks', 'ss-python-server',
'-s', testConfig['bind'],
'-p', str(testConfig['port']),
'-k', testConfig['passwd'],
'-m', method
]
if method == 'idea-cfb' or method == 'seed-cfb':
serverCommand.append('--libopenssl=libcrypto.so.1.0.0')
if plugin is None: # 无插件模式
return [{
'caption': caption,
'proxy': proxyInfo,
'server': {
'startCommand': serverCommand,
'fileContent': None,
'filePath': None,
'envVar': {}
},
'aider': None
}]
if plugin == 'rabbit-plugin': # rabbit-tcp
serverAddr = testConfig['bind']
if not serverAddr.find(':') < 0: # IPv6
serverAddr = '[' + serverAddr + ']'
proxyInfo['port'] = rabbitPort
proxyInfo['plugin'] = {
'type': 'rabbit-plugin',
'param': 'serviceAddr=' + serverAddr + ':' + str(testConfig['port']) + ';password=' + testConfig['passwd']
}
return [{
'proxy': proxyInfo,
'caption': 'Shadowsocks plugin rabbit-plugin (basic mode)',
'server': {
'startCommand': serverCommand,
'fileContent': None,
'filePath': None,
'envVar': {}
},
'aider': {
'startCommand': [
'rabbit',
'-mode', 's',
'-password', testConfig['passwd'],
'-rabbit-addr', ':' + str(rabbitPort)
],
'fileContent': None,
'filePath': None,
'envVar': {}
}
}]
# others plugin
result = []
pluginConfig = sip003.loadPluginConfig( # 载入插件配置
plugin, testConfig['host'], testConfig['cert'], testConfig['key']
)
serverBaseCommand = copy.deepcopy(serverCommand)
for pluginOption in pluginConfig:
serverCommand = copy.deepcopy(serverBaseCommand)
serverCommand.append('--plugin')
serverCommand.append(pluginOption['server']['type'])
serverCommand.append('--plugin-opts')
serverCommand.append(pluginOption['server']['param'])
proxyInfo['plugin'] = pluginOption['client']
result.append(copy.deepcopy({
'proxy': proxyInfo,
'caption': 'Shadowsocks plugin ' + proxyInfo['plugin']['type'] + ' (' + pluginOption['caption'] + ')',
'server': {
'startCommand': serverCommand,
'fileContent': pluginOption['file'],
'filePath': pluginOption['path'],
'envVar': {}
},
'aider': None
}))
return result
def test(config: dict) -> list:
global testConfig
testConfig = config
testList = []
for method in ssMethodList: # all Shadowsocks methods
testList += __ssServerConfig(method, None)
for plugin in sip003PluginList: # all SIP003 plugin
testList += __ssServerConfig('aes-256-ctr', plugin)
return testList

119
ProxyTester/ShadowsocksR.py

@ -1,119 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
testConfig = {}
ssrMethodList = [
"aes-128-cfb",
"aes-192-cfb",
"aes-256-cfb",
"aes-128-cfb1",
"aes-192-cfb1",
"aes-256-cfb1",
"aes-128-cfb8",
"aes-192-cfb8",
"aes-256-cfb8",
"aes-128-ctr",
"aes-192-ctr",
"aes-256-ctr",
"aes-128-gcm",
"aes-192-gcm",
"aes-256-gcm",
"aes-128-ofb",
"aes-192-ofb",
"aes-256-ofb",
"camellia-128-cfb",
"camellia-192-cfb",
"camellia-256-cfb",
"none",
"table",
"rc4",
"rc4-md5",
"rc4-md5-6",
"bf-cfb",
"cast5-cfb",
"des-cfb",
"idea-cfb",
"seed-cfb",
"rc2-cfb",
"salsa20",
"xsalsa20",
"chacha20",
"xchacha20",
"chacha20-ietf",
]
ssrProtocolList = [
"origin",
"verify_sha1",
"verify_simple",
"verify_deflate",
"auth_simple",
"auth_sha1",
"auth_sha1_v2",
"auth_sha1_v4",
"auth_aes128",
"auth_aes128_md5",
"auth_aes128_sha1",
"auth_chain_a",
"auth_chain_b",
"auth_chain_c",
"auth_chain_d",
"auth_chain_e",
"auth_chain_f",
]
ssrObfsList = [
"plain",
"http_post",
"http_simple",
"tls_simple",
"tls1.2_ticket_auth",
"tls1.2_ticket_fastauth",
"random_head",
]
def __ssrServerConfig(method: str, protocol: str, obfs: str, caption: str) -> list:
proxyInfo = {
'type': 'ssr',
'server': testConfig['addr'],
'port': testConfig['port'],
'passwd': testConfig['passwd'],
'method': method,
'protocol': protocol,
'obfs': obfs
}
serverCommand = [
'ssr-server',
'-s', testConfig['bind'],
'-p', str(testConfig['port']),
'-k', testConfig['passwd'],
'-m', method,
'-O', protocol,
'-o', obfs
]
return [{
'caption': caption,
'proxy': proxyInfo,
'server': {
'startCommand': serverCommand,
'fileContent': None,
'filePath': None,
'envVar': {}
},
'aider': None
}]
def test(config: dict) -> list:
global testConfig
testConfig = config
testList = []
for method in ssrMethodList: # all ShadowsocksR methods
testList += __ssrServerConfig(method, 'origin', 'plain', 'ShadowsocksR method ' + method)
for protocol in ssrProtocolList: # all ShadowsocksR protocol and obfs
for obfs in ssrObfsList:
testList += __ssrServerConfig('table', protocol, obfs, 'ShadowsocksR protocol ' + protocol + ' obfs ' + obfs)
return testList

149
ProxyTester/Trojan.py

@ -1,149 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
from ProxyTester import Xray
testConfig = {}
def trojanBasicTest() -> dict:
serverConfig = {
'run_type': 'server',
'local_addr': testConfig['bind'],
'local_port': testConfig['port'],
'password': [
testConfig['passwd']
],
'ssl': {
'cert': testConfig['cert'],
'key': testConfig['key']
}
}
return {
'caption': 'Trojan basic',
'proxy': {
'type': 'trojan',
'server': testConfig['addr'],
'port': testConfig['port'],
'passwd': testConfig['passwd'],
'stream': {
'type': 'tcp',
'secure': {
'type': 'tls',
'sni': testConfig['host']
}
}
},
'server': {
'startCommand': ['trojan', '-c', testConfig['file']],
'fileContent': json.dumps(serverConfig),
'filePath': testConfig['file'],
'envVar': {}
},
'aider': None
}
def loadTrojanStream(streamInfo: dict, xtlsFlow: str or None) -> dict:
proxyInfo = {
'type': 'trojan',
'server': testConfig['addr'],
'port': testConfig['port'],
'passwd': testConfig['passwd'],
'stream': streamInfo['client']
}
inboundConfig = {
'protocol': 'trojan',
'listen': testConfig['bind'],
'port': testConfig['port'],
'settings': {
'clients': [
{
'password': testConfig['passwd']
}
]
},
'streamSettings': streamInfo['server']
}
if xtlsFlow is not None: # add XTLS flow option
inboundConfig['settings']['clients'][0]['flow'] = xtlsFlow
return {
'caption': 'Trojan network ' + streamInfo['caption'],
'proxy': proxyInfo,
'server': {
'startCommand': ['xray', '-c', testConfig['file']],
'fileContent': Xray.xrayConfig(inboundConfig),
'filePath': testConfig['file'],
'envVar': {}
},
'aider': None
}
def test(config: dict) -> list:
global testConfig
testConfig = config
testList = [trojanBasicTest()] # basic test
# TCP stream
streamInfo = Xray.loadTcpStream(False, '', '')
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadTcpStream(False, '', '')
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
testList.append(loadTrojanStream(streamInfo, xtlsFlow))
streamInfo = Xray.loadTcpStream(True, config['host'], '/')
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
# mKCP stream
for obfs in Xray.udpObfsList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
testList.append(loadTrojanStream(streamInfo, xtlsFlow))
# WebSocket stream
streamInfo = Xray.loadWsStream(config['host'], config['path'], False)
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.loadWsStream(config['host'], config['path'], True)
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
# HTTP/2 stream
streamInfo = Xray.loadH2Stream(config['host'], config['path'])
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
# QUIC stream
for method in Xray.quicMethodList:
for obfs in Xray.udpObfsList:
streamInfo = Xray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
# GRPC stream
streamInfo = Xray.loadGrpcStream(config['service'])
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.loadGrpcStream(config['service'], multiMode = True)
testList.append(loadTrojanStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadTrojanStream(streamInfo, None))
return testList

169
ProxyTester/TrojanGo.py

@ -1,169 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
import json
from ProxyTester import Plugin
testConfig = {}
trojanGoMethod = [
'AES-128-GCM',
'AES-256-GCM',
'CHACHA20-IETF-POLY1305'
]
sip003PluginList = [ # SIP003插件列表
'obfs-local',
'simple-tls',
'v2ray-plugin',
'xray-plugin',
'kcptun-client',
'gost-plugin',
'ck-client',
'gq-client',
'mtt-client',
'rabbit-plugin',
'qtun-client',
'gun-plugin'
]
def loadTrojanGo(isWs: bool, ssMethod: str or None) -> dict:
caption = 'Trojan-Go original'
serverConfig = {
'run_type': 'server',
'local_addr': testConfig['bind'],
'local_port': testConfig['port'],
'remote_addr': '127.0.0.1', # only for shadowsocks fallback
'remote_port': 343,
'password': [
testConfig['passwd']
],
'disable_http_check': True,
'ssl': {
'cert': testConfig['cert'],
'key': testConfig['key']
}
}
proxyInfo = {
'type': 'trojan-go',
'server': testConfig['addr'],
'port': testConfig['port'],
'passwd': testConfig['passwd'],
'sni': testConfig['host'],
}
if ssMethod is not None: # add Shadowsocks encrypt
caption += ' ' + ssMethod + ' encrypt'
serverConfig['shadowsocks'] = {
'enabled': True,
'method': ssMethod,
'password': testConfig['passwd']
}
proxyInfo['ss'] = {
'method': ssMethod,
'passwd': testConfig['passwd']
}
if isWs: # add WebSocket config
caption += ' (websocket)'
serverConfig['websocket'] = {
'enabled': True,
'host': testConfig['host'],
'path': testConfig['path']
}
proxyInfo['ws'] = {
'host': testConfig['host'],
'path': testConfig['path']
}
return {
'caption': caption,
'client': proxyInfo,
'server': serverConfig,
'file': None,
'path': None
}
def loadTrojanGoPlugin(plugin: str) -> list:
result = []
rabbitPort = 20191
trojanBaseConfig = loadTrojanGo(False, None)
if plugin == 'rabbit-plugin': # rabbit-tcp
serverAddr = testConfig['bind']
if not serverAddr.find(':') < 0: # IPv6
serverAddr = '[' + serverAddr + ']'
trojanBaseConfig['caption'] = 'Trojan-Go rabbit-plugin (basic mode)'
trojanBaseConfig['client']['port'] = rabbitPort
trojanBaseConfig['client']['plugin'] = {
'type': 'rabbit-plugin',
'param': 'serviceAddr=' + serverAddr + ':' + str(testConfig['port']) + ';password=' + testConfig['passwd']
}
trojanBaseConfig['server']['transport_plugin'] = {
'enabled': True,
'type': 'other',
'command': 'rabbit',
'arg': [
'-mode', 's',
'-password', testConfig['passwd'],
'-rabbit-addr', ':' + str(rabbitPort)
]
}
trojanBaseConfig['file'] = None
trojanBaseConfig['path'] = None
return [trojanBaseConfig]
# other plugin
pluginConfig = Plugin.loadPluginConfig(plugin, testConfig['host'], testConfig['cert'], testConfig['key']) # 载入插件配置
for pluginOption in pluginConfig:
trojanConfig = copy.deepcopy(trojanBaseConfig)
trojanConfig['caption'] = 'Trojan-Go plugin ' + plugin + ' (' + pluginOption['caption'] + ')'
trojanConfig['client']['plugin'] = pluginOption['client']
trojanConfig['server']['transport_plugin'] = {
'enabled': True,
'type': 'shadowsocks',
'command': pluginOption['server']['type'],
'option': pluginOption['server']['param']
}
trojanConfig['file'] = pluginOption['file']
trojanConfig['path'] = pluginOption['path']
result.append(trojanConfig)
return result
def loadTrojanGoConfig(trojanGoConfigList: list) -> list:
result = []
for trojanGoConfig in trojanGoConfigList:
result.append({
'caption': trojanGoConfig['caption'],
'proxy': trojanGoConfig['client'],
'server': {
'startCommand': ['trojan-go', '-config', testConfig['file']],
'fileContent': json.dumps(trojanGoConfig['server']),
'filePath': testConfig['file'],
'envVar': {'PATH': '/usr/bin'}
},
'aider': {
'startCommand': None,
'fileContent': trojanGoConfig['file'],
'filePath': trojanGoConfig['path'],
'envVar': {}
}
})
return result
def test(config: dict) -> list:
global testConfig
testConfig = config
testList = []
testList += loadTrojanGoConfig([loadTrojanGo(False, None)]) # basic test
testList += loadTrojanGoConfig([loadTrojanGo(True, None)])
for ssMethod in trojanGoMethod:
testList += loadTrojanGoConfig([loadTrojanGo(False, ssMethod)]) # basic test with shadowsocks
testList += loadTrojanGoConfig([loadTrojanGo(True, ssMethod)])
for plugin in sip003PluginList: # plugin test -> cause zombie process (imperfect trojan-go)
testList += loadTrojanGoConfig(loadTrojanGoPlugin(plugin))
return testList

257
ProxyTester/V2ray.py

@ -1,257 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
import json
httpHeader = {
'version': '1.1',
'status': '200',
'reason': 'OK',
'headers': {
'Content-Type': [
'application/octet-stream',
'video/mpeg'
],
'Transfer-Encoding': [
'chunked'
],
'Connection': [
'keep-alive'
],
'Pragma': 'no-cache'
}
}
kcpSetting = {
'mtu': 1350,
'tti': 20,
'uplinkCapacity': 5,
'downlinkCapacity': 20,
'congestion': False,
'readBufferSize': 1,
'writeBufferSize': 1,
}
udpObfsList = [
'none',
'srtp',
'utp',
'wechat-video',
'dtls',
'wireguard'
]
quicMethodList = [
'none',
'aes-128-gcm',
'chacha20-poly1305',
]
def loadTcpStream(isObfs: bool, host: str, path: str) -> dict:
streamConfig = {
'network': 'tcp',
'tcpSettings': {
'type': 'none'
}
}
if not isObfs: # 不带http混淆
return {
'caption': 'TCP',
'client': {
'type': 'tcp',
'secure': None
},
'server': streamConfig
}
streamConfig['tcpSettings'] = { # http混淆配置
'header': {
'type': 'http',
'response': httpHeader
}
}
return {
'caption': 'TCP with http obfs',
'client': {
'type': 'tcp',
'obfs': {
'host': host,
'path': path
},
'secure': None
},
'server': streamConfig
}
def loadKcpStream(seed: str, obfs: str) -> dict:
kcpSetting['header'] = {
'type': obfs
}
kcpSetting['seed'] = seed
return {
'caption': 'mKCP obfs ' + obfs,
'client': {
'type': 'kcp',
'seed': seed,
'obfs': obfs,
'secure': None
},
'server': {
'network': 'kcp',
'kcpSettings': kcpSetting
}
}
def loadWsStream(host: str, path: str, isEd: bool) -> dict:
wsSetting = {
'path': path,
'headers': {
'Host': host
}
}
if not isEd: # 不带Early Data
return {
'caption': 'WebSocket',
'client': {
'type': 'ws',
'host': host,
'path': path,
'secure': None
},
'server': {
'network': 'ws',
'wsSettings': wsSetting
}
}
wsSetting['maxEarlyData'] = 2048
wsSetting['earlyDataHeaderName'] = 'Sec-WebSocket-Protocol'
return {
'caption': 'WebSocket Max-Early-Data 2048',
'client': {
'type': 'ws',
'host': host,
'path': path,
'ed': 2048,
'secure': None
},
'server': {
'network': 'ws',
'wsSettings': wsSetting
}
}
def loadH2Stream(host: str, path: str) -> dict:
return {
'caption': 'HTTP/2',
'client': {
'type': 'h2',
'host': host,
'path': path,
'secure': None
},
'server': {
'network': 'http',
'httpSettings': {
'host': [host],
'path': path
}
}
}
def loadQuicStream(method: str, passwd: str, obfs: str) -> dict:
return {
'caption': 'QUIC method ' + method + ' obfs ' + obfs,
'client': {
'type': 'quic',
'method': method,
'passwd': passwd,
'obfs': obfs,
'secure': None
},
'server': {
'network': 'quic',
'quicSettings': {
'security': method,
'key': passwd,
'header': {
'type': obfs
}
}
}
}
def loadGrpcStream(service: str, multiMode: bool = False) -> dict:
if not multiMode:
return {
'caption': 'gRPC',
'client': {
'type': 'grpc',
'service': service,
'secure': None
},
'server': {
'network': 'grpc',
'grpcSettings': {
'serviceName': service
}
}
}
return {
'caption': 'gRPC multi-mode',
'client': {
'type': 'grpc',
'service': service,
'mode': 'multi',
'secure': None
},
'server': {
'network': 'grpc',
'grpcSettings': {
'serviceName': service,
'multiMode': True # gRPC multi-mode not work in v2fly-core
}
}
}
def addSecureConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict:
streamInfo = copy.deepcopy(rawStreamInfo)
streamInfo['caption'] += ' (tls)'
streamInfo['client']['secure'] = {
'sni': sni
}
streamInfo['server']['security'] = 'tls'
streamInfo['server']['tlsSettings'] = {
'alpn': [
'h2',
'http/1.1'
],
'certificates': [
{
'certificateFile': cert,
'keyFile': key
}
]
}
return streamInfo
def v2rayConfig(inboundConfig: dict) -> str:
return json.dumps({
'log': {
'loglevel': 'warning'
},
'inbounds': [inboundConfig],
'outbounds': [
{
'protocol': 'freedom'
}
]
})

111
ProxyTester/VLESS.py

@ -1,111 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyTester import Xray
testConfig = {}
def loadVlessStream(streamInfo: dict, xtlsFlow: str or None) -> dict:
proxyInfo = {
'type': 'vless',
'server': testConfig['addr'],
'port': testConfig['port'],
'id': testConfig['id'],
'stream': streamInfo['client']
}
inboundConfig = {
'protocol': 'vless',
'listen': testConfig['bind'],
'port': testConfig['port'],
'settings': {
'clients': [
{
'id': testConfig['id']
}
],
'decryption': 'none'
},
'streamSettings': streamInfo['server']
}
if xtlsFlow is not None: # add XTLS flow option
inboundConfig['settings']['clients'][0]['flow'] = xtlsFlow
return {
'caption': 'VLESS network ' + streamInfo['caption'],
'proxy': proxyInfo,
'server': {
'startCommand': ['xray', '-c', testConfig['file']],
'fileContent': Xray.xrayConfig(inboundConfig),
'filePath': testConfig['file'],
'envVar': {}
},
'aider': None
}
def test(config: dict) -> list:
global testConfig
testConfig = config
testList = []
# TCP stream
streamInfo = Xray.loadTcpStream(False, '', '')
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadTcpStream(False, '', '')
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
testList.append(loadVlessStream(streamInfo, xtlsFlow))
streamInfo = Xray.loadTcpStream(True, config['host'], '/')
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
# mKCP stream
for obfs in Xray.udpObfsList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
testList.append(loadVlessStream(streamInfo, xtlsFlow))
# WebSocket stream
streamInfo = Xray.loadWsStream(config['host'], config['path'], False)
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.loadWsStream(config['host'], config['path'], True)
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
# HTTP/2 stream
streamInfo = Xray.loadH2Stream(config['host'], config['path'])
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
# QUIC stream
for method in Xray.quicMethodList:
for obfs in Xray.udpObfsList:
streamInfo = Xray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
# GRPC stream
streamInfo = Xray.loadGrpcStream(config['service'])
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.loadGrpcStream(config['service'], multiMode = True)
testList.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVlessStream(streamInfo, None))
return testList

150
ProxyTester/VMess.py

@ -1,150 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyTester import V2ray
testConfig = {}
vmessMethodList = [
'aes-128-gcm',
'chacha20-poly1305',
'auto',
'none',
'zero',
]
def vmessBasicTest(method: str, alterId: int) -> dict:
inboundConfig = {
'protocol': 'vmess',
'listen': testConfig['bind'],
'port': testConfig['port'],
'settings': {
'clients': [
{
'id': testConfig['id'],
'alterId': alterId
}
]
}
}
caption = 'VMess method ' + method
if alterId == 0:
envVar = {}
caption += ' (AEAD)'
else:
envVar = {
'v2ray.vmess.aead.forced': 'false'
}
caption += ' (alterId ' + str(alterId) + ')'
return {
'caption': caption,
'proxy': {
'type': 'vmess',
'server': testConfig['addr'],
'port': testConfig['port'],
'method': method,
'id': testConfig['id'],
'aid': alterId
},
'server': {
'startCommand': ['v2ray', '-c', testConfig['file']],
'fileContent': V2ray.v2rayConfig(inboundConfig),
'filePath': testConfig['file'],
'envVar': envVar
},
'aider': None
}
def loadVmessStream(streamInfo: dict) -> dict:
proxyInfo = {
'type': 'vmess',
'server': testConfig['addr'],
'port': testConfig['port'],
'id': testConfig['id'],
'stream': streamInfo['client']
}
inboundConfig = {
'protocol': 'vmess',
'listen': testConfig['bind'],
'port': testConfig['port'],
'settings': {
'clients': [
{
'id': testConfig['id']
}
]
},
'streamSettings': streamInfo['server']
}
return {
'caption': 'VMess network ' + streamInfo['caption'],
'proxy': proxyInfo,
'server': {
'startCommand': ['v2ray', '-c', testConfig['file']],
'fileContent': V2ray.v2rayConfig(inboundConfig),
'filePath': testConfig['file'],
'envVar': {}
},
'aider': None
}
def test(config: dict) -> list:
global testConfig
testConfig = config
testList = []
# Basic test
for method in vmessMethodList: # methods and AEAD/MD5+AES test
testList.append(vmessBasicTest(method, 0))
testList.append(vmessBasicTest(method, 64))
# TCP stream
streamInfo = V2ray.loadTcpStream(False, '', '')
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.loadTcpStream(True, config['host'], '/')
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# mKCP stream
for obfs in V2ray.udpObfsList:
streamInfo = V2ray.loadKcpStream(config['passwd'], obfs)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# WebSocket stream
streamInfo = V2ray.loadWsStream(config['host'], config['path'], False)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.loadWsStream(config['host'], config['path'], True)
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# HTTP/2 stream
streamInfo = V2ray.loadH2Stream(config['host'], config['path'])
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# QUIC stream
for method in V2ray.quicMethodList:
for obfs in V2ray.udpObfsList:
streamInfo = V2ray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
# GRPC stream
streamInfo = V2ray.loadGrpcStream(config['service'])
testList.append(loadVmessStream(streamInfo))
streamInfo = V2ray.addSecureConfig(streamInfo, config['cert'], config['key'], config['host'])
testList.append(loadVmessStream(streamInfo))
return testList

116
ProxyTester/Xray.py

@ -1,116 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
from ProxyTester import V2ray
xtlsFlowList = [
'xtls-origin',
'xtls-direct',
'xtls-splice',
]
udpObfsList = V2ray.udpObfsList
quicMethodList = V2ray.quicMethodList
xrayConfig = V2ray.v2rayConfig
loadTcpStream = V2ray.loadTcpStream
loadKcpStream = V2ray.loadKcpStream
loadH2Stream = V2ray.loadH2Stream
loadQuicStream = V2ray.loadQuicStream
loadGrpcStream = V2ray.loadGrpcStream
def loadWsStream(host: str, path: str, isEd: bool) -> dict:
if not isEd: # without Early-Data
return {
'caption': 'WebSocket',
'client': {
'type': 'ws',
'host': host,
'path': path,
'secure': None
},
'server': {
'network': 'ws',
'wsSettings': {
'path': path,
'headers': {
'Host': host
}
}
}
}
return {
'caption': 'WebSocket Max-Early-Data 2048',
'client': {
'type': 'ws',
'host': host,
'path': path,
'ed': 2048,
'secure': None
},
'server': {
'network': 'ws',
'wsSettings': {
'path': path + '?ed=2048',
'headers': {
'Host': host
}
}
}
}
def addTlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict:
streamInfo = copy.deepcopy(rawStreamInfo)
streamInfo['caption'] += ' (tls)'
streamInfo['client']['secure'] = {
'type': 'tls',
'sni': sni
}
streamInfo['server']['security'] = 'tls'
streamInfo['server']['tlsSettings'] = {
'alpn': [
'h2',
'http/1.1'
],
'certificates': [
{
'certificateFile': cert,
'keyFile': key
}
]
}
return streamInfo
def addXtlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str, xtlsFlow: str) -> tuple[str, dict]:
streamInfo = copy.deepcopy(rawStreamInfo)
streamInfo['caption'] += ' (' + xtlsFlow + ')'
streamInfo['client']['secure'] = {
'type': 'xtls',
'sni': sni,
'flow': xtlsFlow
}
streamInfo['server']['security'] = 'xtls'
streamInfo['server']['xtlsSettings'] = {
'alpn': [
'h2',
'http/1.1'
],
'certificates': [
{
'certificateFile': cert,
'keyFile': key
}
]
}
if xtlsFlow == 'xtls-origin':
return 'xtls-rprx-origin', streamInfo
elif xtlsFlow == 'xtls-direct':
return 'xtls-rprx-direct', streamInfo
elif xtlsFlow == 'xtls-splice':
return 'xtls-rprx-direct', streamInfo
else:
raise Exception('Unknown XTLS flow')

6
ProxyTester/__init__.py

@ -1,6 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyTester.tester import *
__all__ = ['test']

32
ProxyTester/tester.py

@ -1,32 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyTester import Shadowsocks
from ProxyTester import ShadowsocksR
from ProxyTester import VMess
from ProxyTester import VLESS
from ProxyTester import Trojan
from ProxyTester import TrojanGo
from ProxyTester import Brook
from ProxyTester import Hysteria
def test(key: str, config: dict) -> list:
if key in ['ss', 'shadowsocks']:
testObj = Shadowsocks
elif key in ['ssr', 'shadowsocksr']:
testObj = ShadowsocksR
elif key == 'vmess':
testObj = VMess
elif key == 'vless':
testObj = VLESS
elif key == 'trojan':
testObj = Trojan
elif key == 'trojan-go':
testObj = TrojanGo
elif key == 'brook':
testObj = Brook
elif key == 'hysteria':
testObj = Hysteria
else:
return []
return testObj.test(config)

93
Test.py

@ -1,93 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import os
import sys
import time
import signal
import subprocess
import Check as Checker
import ProxyTester as Tester
testConfig = {
'bind': '::1',
'addr': '::1',
'port': 12345,
'passwd': 'dnomd343',
'host': 'local.343.re',
'path': '/test',
'service': 'dnomd343',
'file': '/tmp/proxycTest.json',
'id': '1f7aa040-94d8-4b53-ae85-af6946d550bb',
'cert': '/etc/ssl/certs/343.re/fullchain.pem',
'key': '/etc/ssl/certs/343.re/privkey.pem',
}
def testBuild(config: dict): # load file and start process
if config['filePath'] is not None:
with open(config['filePath'], 'w') as fileObject: # save file
fileObject.write(config['fileContent'])
if config['startCommand'] is None:
return None
return subprocess.Popen( # start process
config['startCommand'],
env = config['envVar'],
stdout = subprocess.DEVNULL,
stderr = subprocess.DEVNULL,
preexec_fn = os.setpgrp # new process group
)
def testDestroy(config: dict, process): # remove file and kill process
if process is not None and process.poll() is None: # still alive
os.killpg(os.getpgid(process.pid), signal.SIGTERM) # kill process group
while process.poll() is None: # wait for exit
process.terminate() # SIGTERM
time.sleep(0.2)
if config['filePath'] is not None:
os.remove(config['filePath']) # remove file
def testObject(option: dict) -> None: # test target object
aiderProcess = None
serverProcess = testBuild(option['server']) # start server process
if option['aider'] is not None:
aiderProcess = testBuild(option['aider']) # start aider process
checkResult = Checker.proxyTest({ # http check
'check': ['http'],
'info': option['proxy']
}, startDelay = 0.5)
print(option['caption'], end=' -> ')
if not checkResult['success']: # client build error
print('\n--------------------------------------------------------------------------------------------------------------------------------')
print(option)
print('--------------------------------------------------------------------------------------------------------------------------------\n')
testDestroy(option['server'], serverProcess) # destroy and exit
if option['aider'] is not None:
testDestroy(option['aider'], aiderProcess)
raise Exception('check error')
delay = checkResult['check']['http']['delay'] # get http delay
if delay > 0:
print(format(delay, '.2f') + 'ms')
else:
print('ERROR')
testDestroy(option['server'], serverProcess) # destroy server process
if option['aider'] is not None:
testDestroy(option['aider'], aiderProcess) # destroy aider process
if len(sys.argv) == 1: # no param
print('Unknown test type')
sys.exit(1)
testList = Tester.test(sys.argv[1].lower().strip(), testConfig) # get test list
if len(sys.argv) == 2: # test all
for i in range(0, len(testList)):
print(str(i), end = ': ')
testObject(testList[i])
sys.exit(0)
if len(sys.argv) == 3: # test target index
testObject(testList[int(sys.argv[2])])
else:
print('Too many options')
Loading…
Cancel
Save