Browse Source

feat: test of VLESS

master
Dnomd343 3 years ago
parent
commit
3f4bd8e840
  1. 1
      ProxyTester/V2ray.py
  2. 104
      ProxyTester/VLESS.py
  3. 4
      ProxyTester/VMess.py
  4. 112
      ProxyTester/Xray.py
  5. 7
      ProxyTester/tester.py
  6. 2
      Test.py
  7. 100
      demo.py

1
ProxyTester/V2ray.py

@ -1,5 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding:utf-8 -*- # -*- coding:utf-8 -*-
import copy import copy
import json import json

104
ProxyTester/VLESS.py

@ -0,0 +1,104 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
from ProxyTester import Xray
config = {}
def loadVlessStream(streamInfo: dict, xtlsFlow: str or None) -> dict:
proxyInfo = {
'type': 'vless',
'server': '127.0.0.1',
'port': config['port'],
'id': config['id'],
'stream': streamInfo['client']
}
inboundConfig = {
'protocol': 'vless',
'listen': '127.0.0.1',
'port': config['port'],
'settings': {
'clients': [
{
'id': config['id']
}
],
'decryption': 'none'
},
'streamSettings': streamInfo['server']
}
if xtlsFlow is not None: # add XTLS flow option
inboundConfig['settings']['clients'][0]['flow'] = xtlsFlow
return {
'caption': 'VLESS network ' + streamInfo['caption'],
'proxy': proxyInfo,
'server': {
'startCommand': ['xray', '-c', config['file']],
'fileContent': Xray.xrayConfig(inboundConfig),
'filePath': config['file'],
'envVar': {}
},
'aider': None
}
def vlessTest(vlessConfig: dict) -> list:
result = []
for key, value in vlessConfig.items(): # vlessConfig -> config
config[key] = value
# TCP stream
streamInfo = Xray.loadTcpStream(False, '', '')
result.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None))
for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadTcpStream(False, '', '')
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
result.append(loadVlessStream(streamInfo, xtlsFlow))
streamInfo = Xray.loadTcpStream(True, config['host'], '/')
result.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None))
# mKCP stream
for obfs in Xray.udpObfsList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
result.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None))
for flow in Xray.xtlsFlowList:
streamInfo = Xray.loadKcpStream(config['passwd'], obfs)
xtlsFlow, streamInfo = Xray.addXtlsConfig(streamInfo, config['cert'], config['key'], config['host'], flow)
result.append(loadVlessStream(streamInfo, xtlsFlow))
# WebSocket stream
streamInfo = Xray.loadWsStream(config['host'], config['path'], False)
result.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.loadWsStream(config['host'], config['path'], True)
result.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None))
# HTTP/2 stream
streamInfo = Xray.loadH2Stream(config['host'], config['path'])
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None))
# QUIC stream
for method in Xray.quicMethodList:
for obfs in Xray.udpObfsList:
streamInfo = Xray.loadQuicStream(method, config['passwd'], obfs)
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None))
# GRPC stream
streamInfo = Xray.loadGrpcStream(config['service'])
result.append(loadVlessStream(streamInfo, None))
streamInfo = Xray.addTlsConfig(streamInfo, config['cert'], config['key'], config['host'])
result.append(loadVlessStream(streamInfo, None))
return result

4
ProxyTester/VMess.py

@ -13,7 +13,7 @@ vmessMethodList = [
'zero', 'zero',
] ]
def vmessBasicTest(method: str, alterId: int): def vmessBasicTest(method: str, alterId: int) -> dict:
inboundConfig = { inboundConfig = {
'protocol': 'vmess', 'protocol': 'vmess',
'listen': '127.0.0.1', 'listen': '127.0.0.1',
@ -57,7 +57,7 @@ def vmessBasicTest(method: str, alterId: int):
'aider': None 'aider': None
} }
def loadVmessStream(streamInfo): def loadVmessStream(streamInfo: dict) -> dict:
proxyInfo = { proxyInfo = {
'type': 'vmess', 'type': 'vmess',
'server': '127.0.0.1', 'server': '127.0.0.1',

112
ProxyTester/Xray.py

@ -0,0 +1,112 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
from ProxyTester import V2ray
xtlsFlowList = [
'xtls-origin',
'xtls-direct',
'xtls-splice',
]
udpObfsList = V2ray.udpObfsList
quicMethodList = V2ray.quicMethodList
xrayConfig = V2ray.v2rayConfig
loadTcpStream = V2ray.loadTcpStream
loadKcpStream = V2ray.loadKcpStream
loadH2Stream = V2ray.loadH2Stream
loadQuicStream = V2ray.loadQuicStream
loadGrpcStream = V2ray.loadGrpcStream
def loadWsStream(host: str, path: str, isEd: bool) -> dict:
if not isEd: # without Early-Data
return {
'caption': 'WebSocket',
'client': {
'type': 'ws',
'host': host,
'path': path
},
'server': {
'network': 'ws',
'wsSettings': {
'path': path,
'headers': {
'Host': host
}
}
}
}
return {
'caption': 'WebSocket Max-Early-Data 2048',
'client': {
'type': 'ws',
'host': host,
'path': path,
'ed': 2048
},
'server': {
'network': 'ws',
'wsSettings': {
'path': path + '?ed=2048',
'headers': {
'Host': host
}
}
}
}
def addTlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str) -> dict:
streamInfo = copy.deepcopy(rawStreamInfo)
streamInfo['caption'] += ' (tls)'
streamInfo['client']['secure'] = {
'type': 'tls',
'sni': sni
}
streamInfo['server']['security'] = 'tls'
streamInfo['server']['tlsSettings'] = {
'alpn': [
'h2',
'http/1.1'
],
'certificates': [
{
'certificateFile': cert,
'keyFile': key
}
]
}
return streamInfo
def addXtlsConfig(rawStreamInfo: dict, cert: str, key: str, sni: str, xtlsFlow: str) -> tuple[str, dict]:
streamInfo = copy.deepcopy(rawStreamInfo)
streamInfo['caption'] += ' (' + xtlsFlow + ')'
streamInfo['client']['secure'] = {
'type': 'xtls',
'sni': sni,
'flow': xtlsFlow
}
streamInfo['server']['security'] = 'xtls'
streamInfo['server']['xtlsSettings'] = {
'alpn': [
'h2',
'http/1.1'
],
'certificates': [
{
'certificateFile': cert,
'keyFile': key
}
]
}
if xtlsFlow == 'xtls-origin':
return 'xtls-rprx-origin', streamInfo
elif xtlsFlow == 'xtls-direct':
return 'xtls-rprx-direct', streamInfo
elif xtlsFlow == 'xtls-splice':
return 'xtls-rprx-direct', streamInfo
else:
raise Exception('Unknown XTLS flow')

7
ProxyTester/tester.py

@ -4,13 +4,16 @@
from ProxyTester import Shadowsocks from ProxyTester import Shadowsocks
from ProxyTester import ShadowsocksR from ProxyTester import ShadowsocksR
from ProxyTester import VMess from ProxyTester import VMess
from ProxyTester import VLESS
def test(key: str, config: dict) -> list: def test(key: str, config: dict) -> list:
if key == 'ss': if key in ['ss', 'shadowsocks']:
return Shadowsocks.ssTest(config) return Shadowsocks.ssTest(config)
elif key == 'ssr': elif key in ['ssr', 'shadowsocksr']:
return ShadowsocksR.ssrTest(config) return ShadowsocksR.ssrTest(config)
elif key == 'vmess': elif key == 'vmess':
return VMess.vmessTest(config) return VMess.vmessTest(config)
elif key == 'vless':
return VLESS.vlessTest(config)
else: else:
return [] return []

2
Test.py

@ -66,7 +66,7 @@ def testObject(option: dict) -> None: # test target object
if len(sys.argv) == 1: # no param if len(sys.argv) == 1: # no param
print('Unknown test type') print('Unknown test type')
sys.exit(1) sys.exit(1)
testList = Tester.test(sys.argv[1].lower(), testConfig) # get test list testList = Tester.test(sys.argv[1].lower().strip(), testConfig) # get test list
if len(sys.argv) == 2: # test all if len(sys.argv) == 2: # test all
for i in range(0, len(testList)): for i in range(0, len(testList)):

100
demo.py

@ -1,100 +0,0 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import copy
import time
import Check as Checker
import ProxyFilter as Filter
import ProxyBuilder as Builder
# info = {
# 'type': 'vless',
# 'server': '127.0.0.1',
# 'port': '12345',
# 'id': 'dnomd343',
# 'stream': {
# 'type': 'grpc',
# 'service': 'dnomd343',
# 'secure': {
# 'type': 'tls',
# 'sni': '',
# 'flow': 'xtls-origin',
# 'udp443': True
# }
# }
# }
#
# ret = Filter.filte(info)
#
# print(ret[0])
# print(ret[1])
#
# info = {
# 'type': 'vless',
# 'server': '127.0.0.1',
# 'port': '12345',
# 'id': '58c0f2eb-5d47-45d0-8f5f-ebae5c2cfdd9',
# 'stream': {
# 'type': 'tcp',
# 'secure': {
# 'type': 'xtls',
# 'udp443': True
# }
# }
# }
#
# info = copy.deepcopy(Filter.filte(info)[1])
# print(info)
# Builder.build(info, '/tmp/ProxyC')
# time.sleep(5)
# Builder.destroy(info)
# info = {
# 'type': 'vmess',
# 'server': '127.0.0.1',
# 'port': 12345,
# 'id': '1f7aa040-94d8-4b53-ae85-af6946d550bb',
# 'stream': {
# 'type': 'h2',
# # 'host': 'dns.343.re',
# # 'path': '/test',
# # 'secure': {}
# # 'secure': {
# # 'sni': 'dns.343.re'
# # }
# }
# }
#
# ret = Filter.filte(info)
#
# print(ret[0])
# print(ret[1])
info = {
'type': 'vmess',
'server': '127.0.0.1',
'port': '3345',
'id': '657b26d0-d25e-5b75-a018-40cb679c83a3',
'stream': {
'type': 'ws',
'host': None,
'path': '/test',
'ed': 2048,
'secure': {
'sni': 'dns.343.re',
'alpn': 'h2,http/1.1'
}
}
}
ret = Filter.filte(info)
print(ret[0])
print(ret[1])
data = Checker.proxyTest({
'check': ['http'],
'info': ret[1]
})
print(data)
Loading…
Cancel
Save