diff --git a/Basis/Methods.py b/Basis/Methods.py index 456eaaf..3b8a0b4 100644 --- a/Basis/Methods.py +++ b/Basis/Methods.py @@ -117,6 +117,10 @@ ssrObfuscations = [ # obfuscations of ShadowsocksR (obfs) # VMess Info vmessMethods = ['aes-128-gcm', 'chacha20-poly1305', 'auto', 'none', 'zero'] +# XTLS Info +xtlsFlows = ['xtls-origin', 'xtls-direct', 'xtls-splice'] +xtlsFlows = {x: x.replace('-', '-rprx-') for x in xtlsFlows} + # v2ray / Xray Info quicMethods = ['none', 'aes-128-gcm', 'chacha20-poly1305'] udpObfuscations = ['none', 'srtp', 'utp', 'wechat-video', 'dtls', 'wireguard'] diff --git a/Builder/Xray.py b/Builder/Xray.py index 49321c1..371c4c9 100644 --- a/Builder/Xray.py +++ b/Builder/Xray.py @@ -2,8 +2,9 @@ # -*- coding: utf-8 -*- from Builder import V2ray +from Basis.Methods import xtlsFlows -loadConfig = V2ray.loadConfig # same basic config format +loadConfig = V2ray.loadConfig def loadSecure(secureInfo: dict or None) -> dict: # TLS / XTLS encrypt config @@ -65,11 +66,6 @@ def xtlsFlow(streamInfo: dict or None) -> dict: return {} if streamInfo['secure']['type'] != 'xtls': # not XTLS secure type return {} - xtlsFlows = { - 'xtls-origin': 'xtls-rprx-origin', - 'xtls-direct': 'xtls-rprx-direct', - 'xtls-splice': 'xtls-rprx-splice', - } if streamInfo['secure']['flow'] not in xtlsFlows: raise RuntimeError('Unknown xtls flow') return { diff --git a/Tester/VLESS.py b/Tester/VLESS.py new file mode 100644 index 0000000..1972c4a --- /dev/null +++ b/Tester/VLESS.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import json +from Tester import Xray +from Builder import VLESS +from Basis.Logger import logging +from Basis.Process import Process +from Basis.Functions import md5Sum +from Basis.Functions import genUUID +from Basis.Methods import xtlsFlows +from Basis.Functions import getAvailablePort + +settings = { + 'serverBind': '127.0.0.1', + 'clientBind': '127.0.0.1', + # 'serverBind': '::1', + # 'clientBind': '::1', + 'workDir': '/tmp/ProxyC', +} + + +def loadServer(configFile: str, proxyInfo: dict, streamConfig: dict, xtlsFlow: str or None) -> Process: + vlessConfig = Xray.loadConfig({ + 'protocol': 'vless', + 'listen': proxyInfo['server'], + 'port': proxyInfo['port'], + 'settings': { + 'clients': [{**{ + 'id': proxyInfo['id'], + }, **({} if xtlsFlow is None else { + 'flow': xtlsFlow + })}], + 'decryption': 'none' + }, + 'streamSettings': streamConfig + }) + serverFile = os.path.join(settings['workDir'], configFile) + return Process(settings['workDir'], cmd = ['xray', '-c', serverFile], file = { + 'path': serverFile, + 'content': json.dumps(vlessConfig) + }, isStart = False) + + +def loadClient(configFile: str, proxyInfo: dict, socksInfo: dict) -> Process: # load client process + clientFile = os.path.join(settings['workDir'], configFile) + vlessCommand, vlessConfig, _ = VLESS.load(proxyInfo, socksInfo, clientFile) + return Process(settings['workDir'], cmd = vlessCommand, file = { + 'path': clientFile, + 'content': vlessConfig + }, isStart = False) + + +def loadTest(stream: dict) -> dict: + proxyInfo = { # connection info + 'server': settings['serverBind'], + 'port': getAvailablePort(), + 'method': 'none', + 'id': genUUID(), # random uuid v5 + 'stream': stream['info'] + } + socksInfo = { # socks5 interface for test + 'addr': settings['clientBind'], + 'port': getAvailablePort() + } + xtlsFlow = None + if stream['info']['secure'] is not None and stream['info']['secure']['type'] == 'xtls': # with XTLS secure + xtlsFlow = xtlsFlows[stream['info']['secure']['flow']] + xtlsFlow = xtlsFlow.replace('splice', 'direct') # XTLS on server should use xtls-rprx-direct flow + configName = 'vless_%s' % (md5Sum(stream['caption'])[:8]) + testInfo = { # release test info + 'title': 'VLESS test: %s' % stream['caption'], + 'client': loadClient(configName + '_client.json', proxyInfo, socksInfo), + 'server': loadServer(configName + '_server.json', proxyInfo, stream['server'], xtlsFlow), + 'socks': socksInfo, # exposed socks5 address + 'interface': { + 'addr': proxyInfo['server'], + 'port': proxyInfo['port'], + } + } + logging.debug('New vless test -> %s' % testInfo) + return testInfo + + +def load(): + streams = Xray.loadStream() # load xray-core stream list + for stream in streams: # test all stream cases + yield loadTest(stream) diff --git a/Tester/VMess.py b/Tester/VMess.py index 67cbcdc..261b764 100644 --- a/Tester/VMess.py +++ b/Tester/VMess.py @@ -60,7 +60,7 @@ def loadTest(method: str, aid: int, stream: dict) -> dict: proxyInfo = { # connection info 'server': settings['serverBind'], 'port': getAvailablePort(), - 'method': 'auto', + 'method': method, 'id': genUUID(), # random uuid v5 'aid': aid, 'stream': stream['info'] diff --git a/Tester/Xray.py b/Tester/Xray.py new file mode 100644 index 0000000..ef5b6b9 --- /dev/null +++ b/Tester/Xray.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import copy +import itertools + +from Tester import V2ray +from Basis.Methods import xtlsFlows +from Basis.Functions import genFlag +from Basis.Methods import quicMethods +from Basis.Methods import udpObfuscations + +loadConfig = V2ray.loadConfig +tcpStream = V2ray.tcpStream +kcpStream = V2ray.kcpStream +h2Stream = V2ray.h2Stream +quicStream = V2ray.quicStream +grpcStream = V2ray.grpcStream + +settings = { + 'site': 'www.bing.com', + 'host': '343.re', + 'cert': '/etc/ssl/certs/343.re/fullchain.pem', + 'key': '/etc/ssl/certs/343.re/privkey.pem', +} + + +def addSecure(streamConfig: dict, xtlsFlow: str or None = None, isUdp443: bool = False) -> dict: # add TLS or XTLS + streamConfig['caption'] += ' (with %s)' % ( + 'tls' if xtlsFlow is None else (xtlsFlow + ('-udp443' if isUdp443 else '')) + ) + streamConfig['info']['secure'] = {**{ # secure options for client + 'type': 'tls' if xtlsFlow is None else 'xtls', + 'sni': settings['host'], + 'alpn': None, + 'verify': True, + }, **({} if xtlsFlow is None else { + 'flow': xtlsFlow, + 'udp443': isUdp443, + })} + streamConfig['server']['security'] = 'tls' if xtlsFlow is None else 'xtls' + streamConfig['server']['%sSettings' % streamConfig['server']['security']] = { # cert and key for server + 'alpn': ['h2', 'http/1.1'], + 'certificates': [{ + 'certificateFile': settings['cert'], + 'keyFile': settings['key'], + }] + } + return streamConfig + + +def wsStream(isEd: bool) -> dict: + path = '/' + genFlag(length = 6) # random websocket path + return { + 'caption': 'WebSocket stream' + (' (Max-Early-Data 2048)' if isEd else ''), + 'info': { + 'type': 'ws', + 'host': settings['host'], + 'path': path, + 'ed': 2048 if isEd else None, + 'secure': None, + }, + 'server': { + 'network': 'ws', + 'wsSettings': { + 'path': path + ('?ed=2048' if isEd else ''), + 'headers': { + 'Host': settings['host'] + } + } + } + } + + +def loadStream() -> list: + streams = [] + addStream = lambda x: streams.append(copy.deepcopy(x)) + for isObfs in [False, True]: + addStream(tcpStream(isObfs)) # TCP stream + addStream(addSecure(tcpStream(isObfs))) # TCP stream with TLS + if isObfs: continue # can't use XTLS when enable obfs + for xtlsFlow, udp443 in itertools.product(xtlsFlows, [False, True]): + addStream(addSecure(tcpStream(isObfs = False), xtlsFlow, udp443)) # TCP stream with XTLS + for udpObfs in udpObfuscations: + addStream(kcpStream(udpObfs)) # mKCP stream + addStream(addSecure(kcpStream(udpObfs))) # mKCP stream with TLS + for xtlsFlow, udp443 in itertools.product(xtlsFlows, [False, True]): + addStream(addSecure(kcpStream(udpObfs), xtlsFlow, udp443)) # mKCP stream with XTLS + for isEd in [False, True]: + addStream(wsStream(isEd)) # WebSocket stream + addStream(addSecure(wsStream(isEd))) # WebSocket stream with TLS + addStream(addSecure(h2Stream())) # HTTP/2 stream with TLS + for quicMethod, quicObfs in itertools.product(quicMethods, udpObfuscations): + addStream(addSecure(quicStream(quicMethod, quicObfs))) # QUIC stream with TLS + for isMulti in [False, True]: + addStream(grpcStream(isMulti)) # gRPC stream + addStream(addSecure(grpcStream(isMulti))) # gRPC stream with TLS + return streams diff --git a/test.py b/test.py index b46d6cd..7c76673 100755 --- a/test.py +++ b/test.py @@ -6,6 +6,7 @@ import requests from threading import Thread from Tester import VMess +from Tester import VLESS from Tester import Shadowsocks from Tester import ShadowsocksR @@ -95,9 +96,11 @@ ss = Shadowsocks.load(isExtra = True) # ss = Shadowsocks.load(isExtra = False) ssr = ShadowsocksR.load() vmess = VMess.load() +vless = VLESS.load() logging.critical('test start') # runTest(ss, 64) # runTest(ssr, 64) -runTest(vmess, 64) +# runTest(vmess, 64) +runTest(vless, 64) logging.critical('test complete')