Browse Source

feat: VLESS test function

master
dnomd343 2 years ago
parent
commit
fe8ca73065
  1. 4
      Basis/Methods.py
  2. 8
      Builder/Xray.py
  3. 89
      Tester/VLESS.py
  4. 2
      Tester/VMess.py
  5. 98
      Tester/Xray.py
  6. 5
      test.py

4
Basis/Methods.py

@ -117,6 +117,10 @@ ssrObfuscations = [ # obfuscations of ShadowsocksR (obfs)
# VMess Info
vmessMethods = ['aes-128-gcm', 'chacha20-poly1305', 'auto', 'none', 'zero']
# XTLS Info
xtlsFlows = ['xtls-origin', 'xtls-direct', 'xtls-splice']
xtlsFlows = {x: x.replace('-', '-rprx-') for x in xtlsFlows}
# v2ray / Xray Info
quicMethods = ['none', 'aes-128-gcm', 'chacha20-poly1305']
udpObfuscations = ['none', 'srtp', 'utp', 'wechat-video', 'dtls', 'wireguard']

8
Builder/Xray.py

@ -2,8 +2,9 @@
# -*- coding: utf-8 -*-
from Builder import V2ray
from Basis.Methods import xtlsFlows
loadConfig = V2ray.loadConfig # same basic config format
loadConfig = V2ray.loadConfig
def loadSecure(secureInfo: dict or None) -> dict: # TLS / XTLS encrypt config
@ -65,11 +66,6 @@ def xtlsFlow(streamInfo: dict or None) -> dict:
return {}
if streamInfo['secure']['type'] != 'xtls': # not XTLS secure type
return {}
xtlsFlows = {
'xtls-origin': 'xtls-rprx-origin',
'xtls-direct': 'xtls-rprx-direct',
'xtls-splice': 'xtls-rprx-splice',
}
if streamInfo['secure']['flow'] not in xtlsFlows:
raise RuntimeError('Unknown xtls flow')
return {

89
Tester/VLESS.py

@ -0,0 +1,89 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
from Tester import Xray
from Builder import VLESS
from Basis.Logger import logging
from Basis.Process import Process
from Basis.Functions import md5Sum
from Basis.Functions import genUUID
from Basis.Methods import xtlsFlows
from Basis.Functions import getAvailablePort
settings = {
'serverBind': '127.0.0.1',
'clientBind': '127.0.0.1',
# 'serverBind': '::1',
# 'clientBind': '::1',
'workDir': '/tmp/ProxyC',
}
def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict, xtlsFlow: str or None) -> Process:
vlessConfig = Xray.loadConfig({
'protocol': 'vless',
'listen': proxyInfo['server'],
'port': proxyInfo['port'],
'settings': {
'clients': [{**{
'id': proxyInfo['id'],
}, **({} if xtlsFlow is None else {
'flow': xtlsFlow
})}],
'decryption': 'none'
},
'streamSettings': streamConfig
})
serverFile = os.path.join(settings['workDir'], configFile)
return Process(settings['workDir'], cmd = ['xray', '-c', serverFile], file = {
'path': serverFile,
'content': json.dumps(vlessConfig)
}, isStart = False)
def loadClient(configFile: str, proxyInfo: dict, socksInfo: dict) -> Process: # load client process
clientFile = os.path.join(settings['workDir'], configFile)
vlessCommand, vlessConfig, _ = VLESS.load(proxyInfo, socksInfo, clientFile)
return Process(settings['workDir'], cmd = vlessCommand, file = {
'path': clientFile,
'content': vlessConfig
}, isStart = False)
def loadTest(stream: dict) -> dict:
proxyInfo = { # connection info
'server': settings['serverBind'],
'port': getAvailablePort(),
'method': 'none',
'id': genUUID(), # random uuid v5
'stream': stream['info']
}
socksInfo = { # socks5 interface for test
'addr': settings['clientBind'],
'port': getAvailablePort()
}
xtlsFlow = None
if stream['info']['secure'] is not None and stream['info']['secure']['type'] == 'xtls': # with XTLS secure
xtlsFlow = xtlsFlows[stream['info']['secure']['flow']]
xtlsFlow = xtlsFlow.replace('splice', 'direct') # XTLS on server should use xtls-rprx-direct flow
configName = 'vless_%s' % (md5Sum(stream['caption'])[:8])
testInfo = { # release test info
'title': 'VLESS test: %s' % stream['caption'],
'client': loadClient(configName + '_client.json', proxyInfo, socksInfo),
'server': loadServer(configName + '_server.json', proxyInfo, stream['server'], xtlsFlow),
'socks': socksInfo, # exposed socks5 address
'interface': {
'addr': proxyInfo['server'],
'port': proxyInfo['port'],
}
}
logging.debug('New vless test -> %s' % testInfo)
return testInfo
def load():
streams = Xray.loadStream() # load xray-core stream list
for stream in streams: # test all stream cases
yield loadTest(stream)

2
Tester/VMess.py

@ -60,7 +60,7 @@ def loadTest(method: str, aid: int, stream: dict) -> dict:
proxyInfo = { # connection info
'server': settings['serverBind'],
'port': getAvailablePort(),
'method': 'auto',
'method': method,
'id': genUUID(), # random uuid v5
'aid': aid,
'stream': stream['info']

98
Tester/Xray.py

@ -0,0 +1,98 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import copy
import itertools
from Tester import V2ray
from Basis.Methods import xtlsFlows
from Basis.Functions import genFlag
from Basis.Methods import quicMethods
from Basis.Methods import udpObfuscations
loadConfig = V2ray.loadConfig
tcpStream = V2ray.tcpStream
kcpStream = V2ray.kcpStream
h2Stream = V2ray.h2Stream
quicStream = V2ray.quicStream
grpcStream = V2ray.grpcStream
settings = {
'site': 'www.bing.com',
'host': '343.re',
'cert': '/etc/ssl/certs/343.re/fullchain.pem',
'key': '/etc/ssl/certs/343.re/privkey.pem',
}
def addSecure(streamConfig: dict, xtlsFlow: str or None = None, isUdp443: bool = False) -> dict: # add TLS or XTLS
streamConfig['caption'] += ' (with %s)' % (
'tls' if xtlsFlow is None else (xtlsFlow + ('-udp443' if isUdp443 else ''))
)
streamConfig['info']['secure'] = {**{ # secure options for client
'type': 'tls' if xtlsFlow is None else 'xtls',
'sni': settings['host'],
'alpn': None,
'verify': True,
}, **({} if xtlsFlow is None else {
'flow': xtlsFlow,
'udp443': isUdp443,
})}
streamConfig['server']['security'] = 'tls' if xtlsFlow is None else 'xtls'
streamConfig['server']['%sSettings' % streamConfig['server']['security']] = { # cert and key for server
'alpn': ['h2', 'http/1.1'],
'certificates': [{
'certificateFile': settings['cert'],
'keyFile': settings['key'],
}]
}
return streamConfig
def wsStream(isEd: bool) -> dict:
path = '/' + genFlag(length = 6) # random websocket path
return {
'caption': 'WebSocket stream' + (' (Max-Early-Data 2048)' if isEd else ''),
'info': {
'type': 'ws',
'host': settings['host'],
'path': path,
'ed': 2048 if isEd else None,
'secure': None,
},
'server': {
'network': 'ws',
'wsSettings': {
'path': path + ('?ed=2048' if isEd else ''),
'headers': {
'Host': settings['host']
}
}
}
}
def loadStream() -> list:
streams = []
addStream = lambda x: streams.append(copy.deepcopy(x))
for isObfs in [False, True]:
addStream(tcpStream(isObfs)) # TCP stream
addStream(addSecure(tcpStream(isObfs))) # TCP stream with TLS
if isObfs: continue # can't use XTLS when enable obfs
for xtlsFlow, udp443 in itertools.product(xtlsFlows, [False, True]):
addStream(addSecure(tcpStream(isObfs = False), xtlsFlow, udp443)) # TCP stream with XTLS
for udpObfs in udpObfuscations:
addStream(kcpStream(udpObfs)) # mKCP stream
addStream(addSecure(kcpStream(udpObfs))) # mKCP stream with TLS
for xtlsFlow, udp443 in itertools.product(xtlsFlows, [False, True]):
addStream(addSecure(kcpStream(udpObfs), xtlsFlow, udp443)) # mKCP stream with XTLS
for isEd in [False, True]:
addStream(wsStream(isEd)) # WebSocket stream
addStream(addSecure(wsStream(isEd))) # WebSocket stream with TLS
addStream(addSecure(h2Stream())) # HTTP/2 stream with TLS
for quicMethod, quicObfs in itertools.product(quicMethods, udpObfuscations):
addStream(addSecure(quicStream(quicMethod, quicObfs))) # QUIC stream with TLS
for isMulti in [False, True]:
addStream(grpcStream(isMulti)) # gRPC stream
addStream(addSecure(grpcStream(isMulti))) # gRPC stream with TLS
return streams

5
test.py

@ -6,6 +6,7 @@ import requests
from threading import Thread
from Tester import VMess
from Tester import VLESS
from Tester import Shadowsocks
from Tester import ShadowsocksR
@ -95,9 +96,11 @@ ss = Shadowsocks.load(isExtra = True)
# ss = Shadowsocks.load(isExtra = False)
ssr = ShadowsocksR.load()
vmess = VMess.load()
vless = VLESS.load()
logging.critical('test start')
# runTest(ss, 64)
# runTest(ssr, 64)
runTest(vmess, 64)
# runTest(vmess, 64)
runTest(vless, 64)
logging.critical('test complete')

Loading…
Cancel
Save