From b1016e32ae6c33f86f701b487d8dca0f9913a9e8 Mon Sep 17 00:00:00 2001 From: Dnomd343 Date: Mon, 28 Feb 2022 14:52:52 +0800 Subject: [PATCH] feat: decode VMess common share url --- ProxyDecoder/Shadowsocks.py | 105 ++++++++++++++++---------------- ProxyDecoder/ShadowsocksR.py | 45 +++++++------- ProxyDecoder/VMess.py | 115 ++++++++++++++++++++++++++++++++++- ProxyDecoder/baseFunc.py | 22 +++++++ demo.py | 11 ++++ 5 files changed, 220 insertions(+), 78 deletions(-) create mode 100644 demo.py diff --git a/ProxyDecoder/Shadowsocks.py b/ProxyDecoder/Shadowsocks.py index 5af895a..47ea485 100644 --- a/ProxyDecoder/Shadowsocks.py +++ b/ProxyDecoder/Shadowsocks.py @@ -13,18 +13,22 @@ def __ssPlainDecode(url: str) -> dict: EXAMPLE: ss://bf-cfb:test@192.168.100.1:8888#EXAMPLE """ - content = re.search(r'^ss://([\s\S]+?)(#[\s\S]*)?$', url) # ...#REMARK - info = re.search( - r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', - content.group(1) # method:password@server:port + remark = '' + match = re.search(r'^ss://([\S]+)$', url) # ss://... + if match[1].find('#') != -1: # ...#REMARK + match = re.search(r'^([\S]+)#([\S]*)$', match[1]) + remark = baseFunc.urlDecode( + match[2] if match[2] is not None else '' + ) + match = re.search( + r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)$', match[1] # method:password@server:port ) - remark = content.group(2)[1:] if content.group(2) is not None else '' return { - 'server': info[3], - 'port': int(info[4]), - 'passwd': info[2], - 'method': info[1], - 'remark': baseFunc.urlDecode(remark) + 'server': baseFunc.formatHost(match[3]), + 'port': int(match[4]), + 'passwd': match[2], + 'method': match[1], + 'remark': remark } def __ssCommonDecode(url: str) -> dict: @@ -38,17 +42,20 @@ def __ssCommonDecode(url: str) -> dict: -> YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg -> ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server """ - content = re.search(r'^ss://([a-zA-Z0-9_=+\\-]+)#?([\S]*)?$', url) # base64#REMARK - info = re.search( - r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', - baseFunc.base64Decode(content.group(1)) # method:password@server:port + match = re.search(r'^ss://([a-zA-Z0-9\-_+\\=]+)#?([\S]*)?$', url) # base64#REMARK + remark = baseFunc.urlDecode( + match[2] if match[2] is not None else '' + ) + match = re.search( + r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)$', + baseFunc.base64Decode(match[1]) # method:password@server:port ) return { - 'server': info[3], - 'port': int(info[4]), - 'passwd': info[2], - 'method': info[1], - 'remark': baseFunc.urlDecode(content.group(2)) + 'server': baseFunc.formatHost(match[3]), + 'port': int(match[4]), + 'passwd': match[2], + 'method': match[1], + 'remark': remark } def __sip002Decode(url: str) -> dict: @@ -66,55 +73,47 @@ def __sip002Decode(url: str) -> dict: => ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example """ - content = re.search( - r'^ss://([a-zA-Z0-9_=+\\-]+)@([a-zA-Z0-9.:_-]+):([0-9]+)' + match = re.search( + r'^ss://([a-zA-Z0-9\-_+\\=]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)' r'/?([\S]*)$', url # base64@server:port/... (/可选) ) userInfo = re.search( r'^([\S]+?):([\S]+)$', - baseFunc.base64Decode(content.group(1)) # method:password + baseFunc.base64Decode(match[1]) # method:password ) + info = { - 'server': content.group(2), - 'port': int(content.group(3)), - 'passwd': userInfo.group(2), - 'method': userInfo.group(1), + 'server': baseFunc.formatHost(match[2]), + 'port': int(match[3]), + 'passwd': userInfo[2], + 'method': userInfo[1], 'remark': '' } - if content.group(4).find('#') != -1: # ...#REMARK - content = re.search( - r'^\??([\S]*)#([\S]*)$', - content.group(4) # ?...#REMARK (?可选) - ) - info['remark'] = baseFunc.urlDecode( - content.group(2) + if match[4].find('#') != -1: # ...#REMARK + match = re.search( + r'^\??([\S]*)#([\S]*)$', match[4] # ?...#REMARK (?可选) ) + info['remark'] = baseFunc.urlDecode(match[2]) else: - content = re.search( - r'^\??([\S]*)$', - content.group(4) # ?... (?可选) + match = re.search( + r'^\??([\S]*)$', match[4] # ?... (?可选) ) - plugin = '' - for field in content.group(1).split('&'): # /?plugin=...&other1=...&other2=... - if field.find('=') == -1: # 缺失xxx=... - continue - field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... - if field.group(1) == 'plugin': - plugin = baseFunc.urlDecode(field.group(2)) # plugin参数 - break - if plugin.find(';') == -1: # plugin=... (无参数) - pluginField = { - 'type': plugin, + + params = baseFunc.paramSplit(match[1]) # /?plugin=...&other1=...&other2=... + pluginField = params['plugin'] if 'plugin' in params else '' + if pluginField.find(';') == -1: # plugin=... (无参数) + pluginObject = { + 'type': pluginField, 'param': '' } else: # plugin=...;... (带参数) - plugin = re.search(r'^([\S]*?);([\S]*)$', plugin) # 插件名;插件参数 - pluginField = { - 'type': plugin.group(1), - 'param': plugin.group(2) + match = re.search(r'^([\S]*?);([\S]*)$', pluginField) # 插件名;插件参数 + pluginObject = { + 'type': match[1], + 'param': match[2] } - if pluginField['type'] != '': # 带插件情况 - info['plugin'] = pluginField + if pluginObject['type'] != '': # 带插件时配置 + info['plugin'] = pluginObject return info def ssDecode(url: str, compatible: bool = False) -> dict or None: diff --git a/ProxyDecoder/ShadowsocksR.py b/ProxyDecoder/ShadowsocksR.py index 6ab25cd..2332332 100644 --- a/ProxyDecoder/ShadowsocksR.py +++ b/ProxyDecoder/ShadowsocksR.py @@ -15,33 +15,30 @@ def __ssrCommonDecode(url: str) -> dict: -> server:port:protocol:method:obfs:base64(passwd)/?... -> obfsparam=...&protoparam=...&remarks=...&group=... """ - content = re.search(r'^ssr://([\S]+)$', url).group(1) # ssr://{base64} - content = re.search( - r'^([a-zA-Z0-9.:_-]*):([0-9]*):' # server:p/r - r'([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):' # protocol:method:obfs: - r'([0-9a-zA-Z_=+\\-]*)(/\?)?([\S]*)?$', # base(passwd)/?... - baseFunc.base64Decode(content) + match = re.search(r'^ssr://([\S]+)$', url) # ssr://{BASE64} + match = re.search( + r'^([a-zA-Z0-9.:\-_\[\]]*):([0-9]*):' # server:port: + r'([0-9a-zA-Z_.\-]*):([0-9a-zA-Z_.\-]*):([0-9a-zA-Z_.\-]*):' # protocol:method:obfs: + r'([a-zA-Z0-9\-_+\\=]*)(/\?)?([\S]*)?$', # BASE64(passwd)/?... + baseFunc.base64Decode(match[1]) ) info = { - 'server': content.group(1), - 'port': int(content.group(2)), - 'passwd': baseFunc.base64Decode(content.group(6)), - 'method': content.group(4), - 'protocol': content.group(3), - 'obfs': content.group(5), + 'server': baseFunc.formatHost(match[1]), + 'port': int(match[2]), + 'passwd': baseFunc.base64Decode(match[6]), + 'method': match[4], + 'protocol': match[3], + 'obfs': match[5], } - for field in content.group(8).split('&'): # /?obfsparam=...&protoparam=...&remarks=...&group=... - if field.find('=') == -1: # 缺失xxx=... - continue - field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... - if field.group(1) == 'protoparam': - info['protocolParam'] = baseFunc.base64Decode(field.group(2)) - elif field.group(1) == 'obfsparam': - info['obfsParam'] = baseFunc.base64Decode(field.group(2)) - elif field.group(1) == 'remarks': - info['remark'] = baseFunc.base64Decode(field.group(2)) - elif field.group(1) == 'group': - info['group'] = baseFunc.base64Decode(field.group(2)) + params = baseFunc.paramSplit(match[8]) # /?obfsparam=...&protoparam=...&remarks=...&group=... + if 'protoparam' in params: + info['protocolParam'] = baseFunc.base64Decode(params['protoparam']) + if 'obfsparam' in params: + info['obfsParam'] = baseFunc.base64Decode(params['obfsparam']) + if 'remarks' in params: + info['remark'] = baseFunc.base64Decode(params['remarks']) + if 'group' in params: + info['group'] = baseFunc.base64Decode(params['group']) return info def ssrDecode(url: str) -> dict or None: diff --git a/ProxyDecoder/VMess.py b/ProxyDecoder/VMess.py index 542f145..956e62f 100644 --- a/ProxyDecoder/VMess.py +++ b/ProxyDecoder/VMess.py @@ -119,6 +119,116 @@ def __vmessV2raynDecode(url: str) -> dict: info['stream'] = stream return info +def __vmessCommonDecode(url: str) -> dict: + """ + VMess标准分享链接解码 (only VMessAEAD) + + FORMAT: vmess://$(UUID)@server:port?{fields}#$(remark) + + type -> tcp / kcp / ws / http / quic / grpc + + encryption -> auto / aes-128-gcm / chacha20-poly1305 + + security -> none / tls + + path -> WebSocket / HTTP/2 + + host -> WebSocket / HTTP/2 + + headerType -> mKCP / QUIC UDP obfs -> none / srtp / utp / wechat-video / dtls / wireguard + + seed -> mKCP seed + + quicSecurity -> QUIC method + + key -> QUIC key + + serviceName -> gRPC Service Name + + mode -> gRPC transport mode -> gun / multi / guna + + sni -> TLS SNI + + alpn -> TLS ALPN + + """ + match = re.search(r'^vmess://([\S]+?)(#[\S]*)?$', url) # vmess://...#REMARK + remark = baseFunc.urlDecode( + match[2][1:] if match[2] is not None else '' + ) + match = re.search( + r'^([\S]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)/?([\S]*)$', match[1] + ) + info = { + 'server': baseFunc.formatHost(match[2]), + 'port': int(match[3]), + 'id': baseFunc.urlDecode(match[1]), + 'remark': remark + } + params = baseFunc.paramSplit(match[4]) + if 'encryption' in params: + info['method'] = params['encryption'] + stream = { + 'type': params['type'] + } + + """ + ?httpObfs + path -> WebSocket / HTTP/2 + host -> WebSocket / HTTP/2 + """ + if params['type'] == 'tcp': + if 'headerType' in params and params['headerType']: + stream['obfs'] = {} + if 'host' in params: + stream['obfs']['host'] = params['host'] + if 'path' in params: + stream['obfs']['path'] = params['path'] + elif params['type'] == 'kcp': + if 'headerType' in params: + stream['obfs'] = params['headerType'] + if 'seed' in params: + stream['seed'] = params['seed'] + elif params['type'] == 'ws': + if 'host' in params: + stream['host'] = params['host'] + if 'path' in params: + try: + stream['ed'], stream['path'] = __splitEdParam(params['path']) + except: + stream['path'] = params['path'] + elif params['type'] == 'http': + if 'host' in params: + stream['host'] = params['host'] + if 'path' in params: + stream['path'] = params['path'] + elif params['type'] == 'quic': + if 'headerType' in params: + stream['obfs'] = params['headerType'] + if 'quicSecurity' in params: + stream['method'] = params['quicSecurity'] + if 'key' in params: + stream['passwd'] = params['key'] + elif params['type'] == 'grpc': + if 'serviceName' in params: + stream['service'] = params['serviceName'] + if 'mode' in params and params['mode'] == 'multi': + stream['mode'] = 'multi' + else: + raise Exception('Unknown network type') + + secure = None + if 'security' in params and params['security'] == 'tls': + secure = {} + if 'sni' in params: + secure['sni'] = params['sni'] + if 'alpn' in params: + secure['alpn'] = params['alpn'] + + stream['secure'] = secure + info['stream'] = stream + return info + def vmessDecode(url: str) -> dict or None: """ VMess分享链接解码 @@ -137,6 +247,9 @@ def vmessDecode(url: str) -> dict or None: try: result = __vmessV2raynDecode(url) # try v2rayN decode except: - return None + try: + result = __vmessCommonDecode(url) # try VMess common decode + except: + return None result['type'] = 'vmess' return result diff --git a/ProxyDecoder/baseFunc.py b/ProxyDecoder/baseFunc.py index 1c431ef..7b87a91 100644 --- a/ProxyDecoder/baseFunc.py +++ b/ProxyDecoder/baseFunc.py @@ -1,6 +1,7 @@ #!/usr/bin/python # -*- coding:utf-8 -*- +import re import base64 import urllib.parse @@ -36,3 +37,24 @@ def base64Decode(content: str) -> str or None: return base64.b64decode(content).decode() except: return None + +def formatHost(content: str) -> str: + try: + content = content.lower().strip() + if content[:1] == '[' and content[-1:] == ']': + return content[1:-1] + except: + pass + return content + +def paramSplit(content: str) -> dict: + if content.startswith('?'): + content = content[1:] + result = {} + for field in content.split('&'): + match = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... + try: + result[urlDecode(match[1])] = urlDecode(match[2]) + except: + pass + return result diff --git a/demo.py b/demo.py new file mode 100644 index 0000000..18d6bf4 --- /dev/null +++ b/demo.py @@ -0,0 +1,11 @@ +import ProxyDecoder as Decoder +import ProxyFilter as Filter + +url = '...' + +ret = Decoder.decode(url) +print(ret) + +status, ret = Filter.filte(ret, isExtra = True) +print(status) +print(ret)