Browse Source

feat: support VLESS url decode

master
Dnomd343 3 years ago
parent
commit
3068d64ffb
  1. 143
      ProxyDecoder/VLESS.py
  2. 34
      ProxyDecoder/VMess.py
  3. 17
      ProxyDecoder/baseFunc.py
  4. 3
      ProxyDecoder/decoder.py
  5. 16
      demo.py

143
ProxyDecoder/VLESS.py

@ -0,0 +1,143 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import re
import json
from ProxyDecoder import baseFunc
def __vlessCommonDecode(url: str) -> dict:
"""
VLESS标准分享链接解码
FORMAT: vless://$(UUID)@server:port?{fields}#$(remark)
type -> tcp / kcp / ws / http / quic / grpc
encryption -> none
security -> none / tls / xtls
path -> WebSocket / HTTP/2 / http obfs
host -> WebSocket / HTTP/2 / http obfs
headerType -> mKCP / QUIC UDP obfs -> none / srtp / utp / wechat-video / dtls / wireguard
-> TCP (http obfs) -> http
seed -> mKCP seed
quicSecurity -> QUIC method
key -> QUIC key
serviceName -> gRPC Service Name
mode -> gRPC transport mode -> gun / multi / guna
sni -> TLS / XTLS SNI
alpn -> TLS / XTLS ALPN
flow -> XTLS flow type -> xtls-rprx-origin / xtls-rprx-direct / xtls-rprx-splice
"""
match = re.search(r'^vless://([\S]+?)(#[\S]*)?$', url) # vless://...#REMARK
remark = baseFunc.urlDecode(
match[2][1:] if match[2] is not None else ''
)
match = re.search(
r'^([\S]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)/?([\S]*)$', match[1]
)
info = {
'server': baseFunc.formatHost(match[2]),
'port': int(match[3]),
'id': baseFunc.urlDecode(match[1]),
'remark': remark
}
params = baseFunc.paramSplit(match[4])
stream = {
'type': params['type']
}
if params['type'] == 'tcp':
if 'headerType' in params and params['headerType'] == 'http':
stream['obfs'] = {}
if 'host' in params:
stream['obfs']['host'] = params['host']
if 'path' in params:
stream['obfs']['path'] = params['path']
elif params['type'] == 'kcp':
if 'headerType' in params:
stream['obfs'] = params['headerType']
if 'seed' in params:
stream['seed'] = params['seed']
elif params['type'] == 'ws':
if 'host' in params:
stream['host'] = params['host']
if 'path' in params:
try:
stream['ed'], stream['path'] = baseFunc.splitEdParam(params['path'])
except:
stream['path'] = params['path']
elif params['type'] == 'http':
if 'host' in params:
stream['host'] = params['host']
if 'path' in params:
stream['path'] = params['path']
elif params['type'] == 'quic':
if 'headerType' in params:
stream['obfs'] = params['headerType']
if 'quicSecurity' in params:
stream['method'] = params['quicSecurity']
if 'key' in params:
stream['passwd'] = params['key']
elif params['type'] == 'grpc':
if 'serviceName' in params:
stream['service'] = params['serviceName']
if 'mode' in params and params['mode'] == 'multi':
stream['mode'] = 'multi'
else:
raise Exception('Unknown network type')
secure = None
if 'security' in params:
if params['security'] not in ['tls', 'xtls']:
raise Exception('Unknown security type')
secure = {
'type': params['security']
}
if 'sni' in params:
secure['sni'] = params['sni']
if 'alpn' in params:
secure['alpn'] = params['alpn']
if params['security'] == 'xtls' and 'flow' in params: # XTLS flow
if params['flow'] in ['xtls-rprx-origin', 'xtls-rprx-origin-udp443']:
secure['flow'] = 'xtls-origin'
elif params['flow'] in ['xtls-rprx-direct', 'xtls-rprx-direct-udp443']:
secure['flow'] = 'xtls-direct'
elif params['flow'] in ['xtls-rprx-splice', 'xtls-rprx-splice-udp443']:
secure['flow'] = 'xtls-splice'
stream['secure'] = secure
info['stream'] = stream
return info
def vlessDecode(url: str) -> dict or None:
"""
VLESS分享链接解码
链接合法:
return {
'type': 'vless',
...
}
链接不合法:
return None
"""
if url[0:8] != 'vless://':
return None
try:
result = __vlessCommonDecode(url) # try VLESS common decode
except:
return None
result['type'] = 'vless'
return result

34
ProxyDecoder/VMess.py

@ -5,23 +5,6 @@ import re
import json
from ProxyDecoder import baseFunc
def __splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数
if path.find('?') == -1:
return None, path
content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path)
ed = None
params = []
for field in content[2].split('&'): # ?param_a=...&param_b=...
if not field.startswith('ed='):
params.append(field)
continue
ed = int(field[3:]) # ed=...
if ed is None: # ed param not found
return None, path
if not params: # param -> []
return ed, content[1]
return ed, content[1] + '?' + '&'.join(params)
def __vmessV2raynDecode(url: str) -> dict:
"""
v2rayN / v2rayNG分享链接解码
@ -81,7 +64,7 @@ def __vmessV2raynDecode(url: str) -> dict:
stream['host'] = content['host']
if 'path' in content:
try:
stream['ed'], stream['path'] = __splitEdParam(content['path'])
stream['ed'], stream['path'] = baseFunc.splitEdParam(content['path'])
except:
stream['path'] = content['path']
elif content['net'] == 'h2':
@ -131,11 +114,12 @@ def __vmessCommonDecode(url: str) -> dict:
security -> none / tls
path -> WebSocket / HTTP/2
path -> WebSocket / HTTP/2 / http obfs
host -> WebSocket / HTTP/2
host -> WebSocket / HTTP/2 / http obfs
headerType -> mKCP / QUIC UDP obfs -> none / srtp / utp / wechat-video / dtls / wireguard
-> TCP (http obfs) -> http
seed -> mKCP seed
@ -171,14 +155,8 @@ def __vmessCommonDecode(url: str) -> dict:
stream = {
'type': params['type']
}
"""
?httpObfs
path -> WebSocket / HTTP/2
host -> WebSocket / HTTP/2
"""
if params['type'] == 'tcp':
if 'headerType' in params and params['headerType']:
if 'headerType' in params and params['headerType'] == 'http':
stream['obfs'] = {}
if 'host' in params:
stream['obfs']['host'] = params['host']
@ -194,7 +172,7 @@ def __vmessCommonDecode(url: str) -> dict:
stream['host'] = params['host']
if 'path' in params:
try:
stream['ed'], stream['path'] = __splitEdParam(params['path'])
stream['ed'], stream['path'] = baseFunc.splitEdParam(params['path'])
except:
stream['path'] = params['path']
elif params['type'] == 'http':

17
ProxyDecoder/baseFunc.py

@ -58,3 +58,20 @@ def paramSplit(content: str) -> dict:
except:
pass
return result
def splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数
if path.find('?') == -1:
return None, path
content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path)
ed = None
params = []
for field in content[2].split('&'): # ?param_a=...&param_b=...
if not field.startswith('ed='):
params.append(field)
continue
ed = int(field[3:]) # ed=...
if ed is None: # ed param not found
return None, path
if not params: # param -> []
return ed, content[1]
return ed, content[1] + '?' + '&'.join(params)

3
ProxyDecoder/decoder.py

@ -5,6 +5,7 @@ import re
from ProxyDecoder import Shadowsocks
from ProxyDecoder import ShadowsocksR
from ProxyDecoder import VMess
from ProxyDecoder import VLESS
def decode(url: str) -> dict or None:
"""
@ -28,6 +29,8 @@ def decode(url: str) -> dict or None:
return ShadowsocksR.ssrDecode(url)
elif scheme == 'vmess':
return VMess.vmessDecode(url)
elif scheme == 'vless':
return VLESS.vlessDecode(url)
except:
pass
return None

16
demo.py

@ -1,21 +1,7 @@
import ProxyDecoder as Decoder
import ProxyFilter as Filter
# info = {
# 'type': 'vless',
# 'server': '127.0.0.1',
# 'port': 12345,
# 'id': '1b1757d2-2ff1-4e8d-b62e-4e74c06f1325',
# 'stream': {
# 'type': 'grpc',
# 'service': 'dnomd343'
# }
# }
# ret = Filter.filte(info)
# print(ret)
url = 'vmess://1b1757d2-2ff1-4e8d-b62e-4e74c06f1325@1.1.1.1:443'
url = 'vless://...'
ret = Decoder.decode(url)
print(ret)

Loading…
Cancel
Save