diff --git a/ProxyDecoder/VLESS.py b/ProxyDecoder/VLESS.py new file mode 100644 index 0000000..a2bdcb4 --- /dev/null +++ b/ProxyDecoder/VLESS.py @@ -0,0 +1,143 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- + +import re +import json +from ProxyDecoder import baseFunc + +def __vlessCommonDecode(url: str) -> dict: + """ + VLESS标准分享链接解码 + + FORMAT: vless://$(UUID)@server:port?{fields}#$(remark) + + type -> tcp / kcp / ws / http / quic / grpc + + encryption -> none + + security -> none / tls / xtls + + path -> WebSocket / HTTP/2 / http obfs + + host -> WebSocket / HTTP/2 / http obfs + + headerType -> mKCP / QUIC UDP obfs -> none / srtp / utp / wechat-video / dtls / wireguard + -> TCP (http obfs) -> http + + seed -> mKCP seed + + quicSecurity -> QUIC method + + key -> QUIC key + + serviceName -> gRPC Service Name + + mode -> gRPC transport mode -> gun / multi / guna + + sni -> TLS / XTLS SNI + + alpn -> TLS / XTLS ALPN + + flow -> XTLS flow type -> xtls-rprx-origin / xtls-rprx-direct / xtls-rprx-splice + + """ + match = re.search(r'^vless://([\S]+?)(#[\S]*)?$', url) # vless://...#REMARK + remark = baseFunc.urlDecode( + match[2][1:] if match[2] is not None else '' + ) + match = re.search( + r'^([\S]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)/?([\S]*)$', match[1] + ) + info = { + 'server': baseFunc.formatHost(match[2]), + 'port': int(match[3]), + 'id': baseFunc.urlDecode(match[1]), + 'remark': remark + } + params = baseFunc.paramSplit(match[4]) + stream = { + 'type': params['type'] + } + if params['type'] == 'tcp': + if 'headerType' in params and params['headerType'] == 'http': + stream['obfs'] = {} + if 'host' in params: + stream['obfs']['host'] = params['host'] + if 'path' in params: + stream['obfs']['path'] = params['path'] + elif params['type'] == 'kcp': + if 'headerType' in params: + stream['obfs'] = params['headerType'] + if 'seed' in params: + stream['seed'] = params['seed'] + elif params['type'] == 'ws': + if 'host' in params: + stream['host'] = params['host'] + if 'path' in params: + try: + stream['ed'], stream['path'] = baseFunc.splitEdParam(params['path']) + except: + stream['path'] = params['path'] + elif params['type'] == 'http': + if 'host' in params: + stream['host'] = params['host'] + if 'path' in params: + stream['path'] = params['path'] + elif params['type'] == 'quic': + if 'headerType' in params: + stream['obfs'] = params['headerType'] + if 'quicSecurity' in params: + stream['method'] = params['quicSecurity'] + if 'key' in params: + stream['passwd'] = params['key'] + elif params['type'] == 'grpc': + if 'serviceName' in params: + stream['service'] = params['serviceName'] + if 'mode' in params and params['mode'] == 'multi': + stream['mode'] = 'multi' + else: + raise Exception('Unknown network type') + + secure = None + if 'security' in params: + if params['security'] not in ['tls', 'xtls']: + raise Exception('Unknown security type') + secure = { + 'type': params['security'] + } + if 'sni' in params: + secure['sni'] = params['sni'] + if 'alpn' in params: + secure['alpn'] = params['alpn'] + if params['security'] == 'xtls' and 'flow' in params: # XTLS flow + if params['flow'] in ['xtls-rprx-origin', 'xtls-rprx-origin-udp443']: + secure['flow'] = 'xtls-origin' + elif params['flow'] in ['xtls-rprx-direct', 'xtls-rprx-direct-udp443']: + secure['flow'] = 'xtls-direct' + elif params['flow'] in ['xtls-rprx-splice', 'xtls-rprx-splice-udp443']: + secure['flow'] = 'xtls-splice' + stream['secure'] = secure + info['stream'] = stream + return info + +def vlessDecode(url: str) -> dict or None: + """ + VLESS分享链接解码 + + 链接合法: + return { + 'type': 'vless', + ... + } + + 链接不合法: + return None + """ + if url[0:8] != 'vless://': + return None + try: + result = __vlessCommonDecode(url) # try VLESS common decode + except: + return None + result['type'] = 'vless' + return result diff --git a/ProxyDecoder/VMess.py b/ProxyDecoder/VMess.py index 956e62f..51db1da 100644 --- a/ProxyDecoder/VMess.py +++ b/ProxyDecoder/VMess.py @@ -5,23 +5,6 @@ import re import json from ProxyDecoder import baseFunc -def __splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数 - if path.find('?') == -1: - return None, path - content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path) - ed = None - params = [] - for field in content[2].split('&'): # ?param_a=...¶m_b=... - if not field.startswith('ed='): - params.append(field) - continue - ed = int(field[3:]) # ed=... - if ed is None: # ed param not found - return None, path - if not params: # param -> [] - return ed, content[1] - return ed, content[1] + '?' + '&'.join(params) - def __vmessV2raynDecode(url: str) -> dict: """ v2rayN / v2rayNG分享链接解码 @@ -81,7 +64,7 @@ def __vmessV2raynDecode(url: str) -> dict: stream['host'] = content['host'] if 'path' in content: try: - stream['ed'], stream['path'] = __splitEdParam(content['path']) + stream['ed'], stream['path'] = baseFunc.splitEdParam(content['path']) except: stream['path'] = content['path'] elif content['net'] == 'h2': @@ -131,11 +114,12 @@ def __vmessCommonDecode(url: str) -> dict: security -> none / tls - path -> WebSocket / HTTP/2 + path -> WebSocket / HTTP/2 / http obfs - host -> WebSocket / HTTP/2 + host -> WebSocket / HTTP/2 / http obfs headerType -> mKCP / QUIC UDP obfs -> none / srtp / utp / wechat-video / dtls / wireguard + -> TCP (http obfs) -> http seed -> mKCP seed @@ -171,14 +155,8 @@ def __vmessCommonDecode(url: str) -> dict: stream = { 'type': params['type'] } - - """ - ?httpObfs - path -> WebSocket / HTTP/2 - host -> WebSocket / HTTP/2 - """ if params['type'] == 'tcp': - if 'headerType' in params and params['headerType']: + if 'headerType' in params and params['headerType'] == 'http': stream['obfs'] = {} if 'host' in params: stream['obfs']['host'] = params['host'] @@ -194,7 +172,7 @@ def __vmessCommonDecode(url: str) -> dict: stream['host'] = params['host'] if 'path' in params: try: - stream['ed'], stream['path'] = __splitEdParam(params['path']) + stream['ed'], stream['path'] = baseFunc.splitEdParam(params['path']) except: stream['path'] = params['path'] elif params['type'] == 'http': diff --git a/ProxyDecoder/baseFunc.py b/ProxyDecoder/baseFunc.py index 7b87a91..24d1d67 100644 --- a/ProxyDecoder/baseFunc.py +++ b/ProxyDecoder/baseFunc.py @@ -58,3 +58,20 @@ def paramSplit(content: str) -> dict: except: pass return result + +def splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数 + if path.find('?') == -1: + return None, path + content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path) + ed = None + params = [] + for field in content[2].split('&'): # ?param_a=...¶m_b=... + if not field.startswith('ed='): + params.append(field) + continue + ed = int(field[3:]) # ed=... + if ed is None: # ed param not found + return None, path + if not params: # param -> [] + return ed, content[1] + return ed, content[1] + '?' + '&'.join(params) diff --git a/ProxyDecoder/decoder.py b/ProxyDecoder/decoder.py index 5084912..bdf54ed 100644 --- a/ProxyDecoder/decoder.py +++ b/ProxyDecoder/decoder.py @@ -5,6 +5,7 @@ import re from ProxyDecoder import Shadowsocks from ProxyDecoder import ShadowsocksR from ProxyDecoder import VMess +from ProxyDecoder import VLESS def decode(url: str) -> dict or None: """ @@ -28,6 +29,8 @@ def decode(url: str) -> dict or None: return ShadowsocksR.ssrDecode(url) elif scheme == 'vmess': return VMess.vmessDecode(url) + elif scheme == 'vless': + return VLESS.vlessDecode(url) except: pass return None diff --git a/demo.py b/demo.py index 0ac6936..8ceb452 100644 --- a/demo.py +++ b/demo.py @@ -1,21 +1,7 @@ import ProxyDecoder as Decoder import ProxyFilter as Filter -# info = { -# 'type': 'vless', -# 'server': '127.0.0.1', -# 'port': 12345, -# 'id': '1b1757d2-2ff1-4e8d-b62e-4e74c06f1325', -# 'stream': { -# 'type': 'grpc', -# 'service': 'dnomd343' -# } -# } - -# ret = Filter.filte(info) -# print(ret) - -url = 'vmess://1b1757d2-2ff1-4e8d-b62e-4e74c06f1325@1.1.1.1:443' +url = 'vless://...' ret = Decoder.decode(url) print(ret)