Browse Source

feat: Trojan test function

master
dnomd343 2 years ago
parent
commit
7b97680ec4
  1. 132
      Tester/Trojan.py
  2. 2
      Tester/VLESS.py
  3. 5
      test.py

132
Tester/Trojan.py

@ -0,0 +1,132 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
from Tester import Xray
from Builder import Trojan
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Functions import md5Sum
from Basis.Methods import xtlsFlows
from Basis.Functions import genFlag
from Basis.Functions import getAvailablePort
settings = {
'serverBind': '127.0.0.1',
'clientBind': '127.0.0.1',
# 'serverBind': '::1',
# 'clientBind': '::1',
'workDir': '/tmp/ProxyC',
'host': '343.re',
'cert': '/etc/ssl/certs/343.re/fullchain.pem',
'key': '/etc/ssl/certs/343.re/privkey.pem',
}
def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict, xtlsFlow: str or None) -> Process:
trojanConfig = Xray.loadConfig({
'protocol': 'trojan',
'listen': proxyInfo['server'],
'port': proxyInfo['port'],
'settings': {
'clients': [{**{
'password': proxyInfo['passwd'],
}, **({} if xtlsFlow is None else {
'flow': xtlsFlow
})}],
},
'streamSettings': streamConfig
})
serverFile = os.path.join(settings['workDir'], configFile)
return Process(settings['workDir'], cmd = ['xray', '-c', serverFile], file = {
'path': serverFile,
'content': json.dumps(trojanConfig)
}, isStart = False)
def loadClient(configFile: str, proxyInfo: dict, socksInfo: dict) -> Process: # load client process
clientFile = os.path.join(settings['workDir'], configFile)
trojanCommand, trojanConfig, _ = Trojan.load(proxyInfo, socksInfo, clientFile)
return Process(settings['workDir'], cmd = trojanCommand, file = {
'path': clientFile,
'content': trojanConfig
}, isStart = False)
def loadBasicTest(tcpTlsStream: dict) -> dict:
proxyInfo = { # connection info
'server': settings['serverBind'],
'port': getAvailablePort(),
'passwd': genFlag(length = 8), # random password
'stream': tcpTlsStream['info']
}
socksInfo = { # socks5 interface for test
'addr': settings['clientBind'],
'port': getAvailablePort()
}
trojanConfig = {
'run_type': 'server',
'local_addr': proxyInfo['server'],
'local_port': proxyInfo['port'],
'password': [proxyInfo['passwd']],
'log_level': 0, # 0 -> ALL / 1 -> INFO / 2 -> WARN / 3 -> ERROR / 4 -> FATAL / 5 -> OFF
'ssl': {
'cert': settings['cert'],
'key': settings['key']
}
}
serverFile = os.path.join(settings['workDir'], 'trojan_basic_server.json')
trojanServer = Process(settings['workDir'], cmd = ['trojan', '-c', serverFile], file = {
'path': serverFile,
'content': json.dumps(trojanConfig)
}, isStart = False)
testInfo = { # release test info
'title': 'Trojan test: basic connection',
'client': loadClient('trojan_basic_client.json', proxyInfo, socksInfo),
'server': trojanServer,
'socks': socksInfo, # exposed socks5 address
'interface': {
'addr': proxyInfo['server'],
'port': proxyInfo['port'],
}
}
logging.debug('New trojan test -> %s' % testInfo)
return testInfo
def loadTest(stream: dict) -> dict:
proxyInfo = { # connection info
'server': settings['serverBind'],
'port': getAvailablePort(),
'passwd': genFlag(length = 8), # random password
'stream': stream['info']
}
socksInfo = { # socks5 interface for test
'addr': settings['clientBind'],
'port': getAvailablePort()
}
xtlsFlow = None
if stream['info']['secure'] is not None and stream['info']['secure']['type'] == 'xtls': # with XTLS secure
xtlsFlow = xtlsFlows[stream['info']['secure']['flow']]
xtlsFlow = xtlsFlow.replace('splice', 'direct') # XTLS on server should use xtls-rprx-direct flow
configName = 'trojan_%s' % (md5Sum(stream['caption'])[:8])
testInfo = { # release test info
'title': 'Trojan test: %s' % stream['caption'],
'client': loadClient(configName + '_client.json', proxyInfo, socksInfo),
'server': loadServer(configName + '_server.json', proxyInfo, stream['server'], xtlsFlow),
'socks': socksInfo, # exposed socks5 address
'interface': {
'addr': proxyInfo['server'],
'port': proxyInfo['port'],
}
}
logging.debug('New trojan test -> %s' % testInfo)
return testInfo
def load():
streams = Xray.loadStream() # load xray-core stream list
yield loadBasicTest(streams[1]) # Trojan basic test -> TCP stream with TLS
for stream in streams: # test all stream cases
yield loadTest(stream)

2
Tester/VLESS.py

@ -8,8 +8,8 @@ from Builder import VLESS
from Basis.Logger import logging from Basis.Logger import logging
from Basis.Process import Process from Basis.Process import Process
from Basis.Functions import md5Sum from Basis.Functions import md5Sum
from Basis.Functions import genUUID
from Basis.Methods import xtlsFlows from Basis.Methods import xtlsFlows
from Basis.Functions import genUUID
from Basis.Functions import getAvailablePort from Basis.Functions import getAvailablePort
settings = { settings = {

5
test.py

@ -7,6 +7,7 @@ from threading import Thread
from Tester import VMess from Tester import VMess
from Tester import VLESS from Tester import VLESS
from Tester import Trojan
from Tester import Shadowsocks from Tester import Shadowsocks
from Tester import ShadowsocksR from Tester import ShadowsocksR
@ -97,10 +98,12 @@ ss = Shadowsocks.load(isExtra = True)
ssr = ShadowsocksR.load() ssr = ShadowsocksR.load()
vmess = VMess.load() vmess = VMess.load()
vless = VLESS.load() vless = VLESS.load()
trojan = Trojan.load()
logging.critical('test start') logging.critical('test start')
# runTest(ss, 64) # runTest(ss, 64)
# runTest(ssr, 64) # runTest(ssr, 64)
# runTest(vmess, 64) # runTest(vmess, 64)
runTest(vless, 64) # runTest(vless, 64)
runTest(trojan, 64)
logging.critical('test complete') logging.critical('test complete')

Loading…
Cancel
Save