You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

82 lines
2.9 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
from Tester import Xray
from Builder import VLESS
from Basis.Test import Settings
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Constant import xtlsFlows
from Basis.Functions import md5Sum, genUUID, getAvailablePort
def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict, xtlsFlow: str or None) -> Process:
vlessConfig = Xray.loadConfig({
'protocol': 'vless',
'listen': proxyInfo['server'],
'port': proxyInfo['port'],
'settings': {
'clients': [{
'id': proxyInfo['id'],
**({} if xtlsFlow is None else {
'flow': xtlsFlow
})
}],
'decryption': 'none'
},
'streamSettings': streamConfig
})
serverFile = os.path.join(Settings['workDir'], configFile)
return Process(Settings['workDir'], cmd = ['xray', '-c', serverFile], file = {
'path': serverFile,
'content': json.dumps(vlessConfig)
}, isStart = False)
def loadClient(configFile: str, proxyInfo: dict, socksInfo: dict) -> Process: # load client process
clientFile = os.path.join(Settings['workDir'], configFile)
vlessCommand, vlessConfig, _ = VLESS.load(proxyInfo, socksInfo, clientFile)
return Process(Settings['workDir'], cmd = vlessCommand, file = {
'path': clientFile,
'content': vlessConfig
}, isStart = False)
def loadTest(stream: dict) -> dict:
proxyInfo = { # connection info
'server': Settings['serverBind'],
'port': getAvailablePort(),
'method': 'none',
'id': genUUID(), # random uuid v5
'stream': stream['info']
}
socksInfo = { # socks5 interface for test
'addr': Settings['clientBind'],
'port': getAvailablePort()
}
xtlsFlow = None
if stream['info']['secure'] is not None and stream['info']['secure']['type'] == 'xtls': # with XTLS secure
xtlsFlow = xtlsFlows[stream['info']['secure']['flow']]
xtlsFlow = xtlsFlow.replace('splice', 'direct') # XTLS on server should use xtls-rprx-direct flow
configName = 'vless_%s' % (md5Sum(stream['caption'])[:8])
testInfo = { # release test info
'caption': 'VLESS test: %s' % stream['caption'],
'client': loadClient(configName + '_client.json', proxyInfo, socksInfo),
'server': loadServer(configName + '_server.json', proxyInfo, stream['server'], xtlsFlow),
'socks': socksInfo, # exposed socks5 address
'interface': {
'addr': proxyInfo['server'],
'port': proxyInfo['port'],
}
}
logging.debug('New VLESS test -> %s' % testInfo)
return testInfo
def load():
streams = Xray.loadStream() # load xray-core stream list
for stream in streams: # test all stream cases
yield loadTest(stream)
logging.info('VLESS test yield complete')