diff --git a/ProxyDecoder/Shadowsocks.py b/ProxyDecoder/Shadowsocks.py index 9ca1f0e..5af895a 100644 --- a/ProxyDecoder/Shadowsocks.py +++ b/ProxyDecoder/Shadowsocks.py @@ -4,7 +4,7 @@ import re from ProxyDecoder import baseFunc -def __ssPlainDecode(url: str) -> dict or None: +def __ssPlainDecode(url: str) -> dict: """ Shadowsocks原始分享链接解码 @@ -13,25 +13,21 @@ def __ssPlainDecode(url: str) -> dict or None: EXAMPLE: ss://bf-cfb:test@192.168.100.1:8888#EXAMPLE """ - try: - content = re.search(r'^ss://([\s\S]+?)(#[\s\S]*)?$', url) # ...#REMARK - info = re.search( - r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', - content.group(1) # method:password@server:port - ) - remark = content.group(2)[1:] if content.group(2) is not None else '' - remark = remark.replace('+', ' ') # 向后兼容部分客户端 - return { - 'server': info[3], - 'port': int(info[4]), - 'passwd': info[2], - 'method': info[1], - 'remark': baseFunc.urlDecode(remark) - } - except: - return None + content = re.search(r'^ss://([\s\S]+?)(#[\s\S]*)?$', url) # ...#REMARK + info = re.search( + r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', + content.group(1) # method:password@server:port + ) + remark = content.group(2)[1:] if content.group(2) is not None else '' + return { + 'server': info[3], + 'port': int(info[4]), + 'passwd': info[2], + 'method': info[1], + 'remark': baseFunc.urlDecode(remark) + } -def __ssCommonDecode(url: str) -> dict or None: +def __ssCommonDecode(url: str) -> dict: """ Shadowsocks经典分享链接解码 @@ -42,23 +38,20 @@ def __ssCommonDecode(url: str) -> dict or None: -> YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg -> ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server """ - try: - content = re.search(r'^ss://([a-zA-Z0-9_=+\\-]+)#?([\S]*)?$', url) # base64#REMARK - info = re.search( - r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', - baseFunc.base64Decode(content.group(1)) # method:password@server:port - ) - return { - 'server': info[3], - 'port': int(info[4]), - 'passwd': info[2], - 'method': info[1], - 'remark': baseFunc.urlDecode(content.group(2)) - } - except: - return None + content = re.search(r'^ss://([a-zA-Z0-9_=+\\-]+)#?([\S]*)?$', url) # base64#REMARK + info = re.search( + r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', + baseFunc.base64Decode(content.group(1)) # method:password@server:port + ) + return { + 'server': info[3], + 'port': int(info[4]), + 'passwd': info[2], + 'method': info[1], + 'remark': baseFunc.urlDecode(content.group(2)) + } -def __sip002Decode(url: str) -> dict or None: +def __sip002Decode(url: str) -> dict: """ Shadowsocks SIP002分享链接解码 @@ -73,63 +66,58 @@ def __sip002Decode(url: str) -> dict or None: => ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example """ - try: + content = re.search( + r'^ss://([a-zA-Z0-9_=+\\-]+)@([a-zA-Z0-9.:_-]+):([0-9]+)' + r'/?([\S]*)$', url # base64@server:port/... (/可选) + ) + userInfo = re.search( + r'^([\S]+?):([\S]+)$', + baseFunc.base64Decode(content.group(1)) # method:password + ) + info = { + 'server': content.group(2), + 'port': int(content.group(3)), + 'passwd': userInfo.group(2), + 'method': userInfo.group(1), + 'remark': '' + } + if content.group(4).find('#') != -1: # ...#REMARK content = re.search( - r'^ss://([a-zA-Z0-9_=+\\-]+)@([a-zA-Z0-9.:_-]+):([0-9]+)' - r'/?([\S]*)$', url # base64@server:port/... (/可选) + r'^\??([\S]*)#([\S]*)$', + content.group(4) # ?...#REMARK (?可选) + ) + info['remark'] = baseFunc.urlDecode( + content.group(2) ) - userInfo = re.search( - r'^([\S]+?):([\S]+)$', - baseFunc.base64Decode(content.group(1)) # method:password + else: + content = re.search( + r'^\??([\S]*)$', + content.group(4) # ?... (?可选) ) - info = { - 'server': content.group(2), - 'port': int(content.group(3)), - 'passwd': userInfo.group(2), - 'method': userInfo.group(1), - 'remark': '' + plugin = '' + for field in content.group(1).split('&'): # /?plugin=...&other1=...&other2=... + if field.find('=') == -1: # 缺失xxx=... + continue + field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... + if field.group(1) == 'plugin': + plugin = baseFunc.urlDecode(field.group(2)) # plugin参数 + break + if plugin.find(';') == -1: # plugin=... (无参数) + pluginField = { + 'type': plugin, + 'param': '' } - if content.group(4).find('#') != -1: # ...#REMARK - content = re.search( - r'^\??([\S]*)#([\S]*)$', - content.group(4) # ?...#REMARK (?可选) - ) - info['remark'] = baseFunc.urlDecode( - content.group(2) - ) - else: - content = re.search( - r'^\??([\S]*)$', - content.group(4) # ?... (?可选) - ) - plugin = '' - for field in content.group(1).split('&'): # /?plugin=...&other1=...&other2=... - if field.find('=') == -1: # 缺失xxx=... - continue - field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... - if field.group(1) == 'plugin': - plugin = baseFunc.urlDecode(field.group(2)) # plugin参数 - break - if plugin.find(';') == -1: # plugin=... (无参数) - pluginField = { - 'type': plugin, - 'param': '' - } - else: # plugin=...;... (带参数) - plugin = re.search(r'^([\S]*?);([\S]*)$', plugin) # 插件名;插件参数 - pluginField = { - 'type': plugin.group(1), - 'param': plugin.group(2) - } - if pluginField['type'] == '': # 无插件情况 - info['plugin'] = None - else: - info['plugin'] = pluginField - return info - except: - return None + else: # plugin=...;... (带参数) + plugin = re.search(r'^([\S]*?);([\S]*)$', plugin) # 插件名;插件参数 + pluginField = { + 'type': plugin.group(1), + 'param': plugin.group(2) + } + if pluginField['type'] != '': # 带插件情况 + info['plugin'] = pluginField + return info -def ssDecode(url: str) -> dict or None: +def ssDecode(url: str, compatible: bool = False) -> dict or None: """ Shadowsocks分享链接解码 @@ -142,17 +130,19 @@ def ssDecode(url: str) -> dict or None: 链接不合法: return None """ + if url[0:5] != 'ss://': + return None try: - if url[0:5] != 'ss://': - return None result = __ssCommonDecode(url) # try shadowsocks common decode - if result is None: - result = __sip002Decode(url) # try shadowsocks sip002 decode - if result is None: - result = __ssPlainDecode(url) # try shadowsocks plain decode - if result is not None: # 解析成功 - result['type'] = 'ss' - return result except: - pass - return None + try: + result = __sip002Decode(url) # try shadowsocks sip002 decode + except: + try: + result = __ssPlainDecode(url) # try shadowsocks plain decode + except: + return None + if compatible and 'remark' in result: # 向后兼容部分客户端 + result['remark'] = result['remark'].replace('+', ' ') + result['type'] = 'ss' + return result diff --git a/ProxyDecoder/ShadowsocksR.py b/ProxyDecoder/ShadowsocksR.py index cef6cd7..6ab25cd 100644 --- a/ProxyDecoder/ShadowsocksR.py +++ b/ProxyDecoder/ShadowsocksR.py @@ -4,7 +4,7 @@ import re from ProxyDecoder import baseFunc -def __ssrCommonDecode(url: str) -> dict or None: +def __ssrCommonDecode(url: str) -> dict: """ ShadowsocksR经典分享链接解码 @@ -15,37 +15,34 @@ def __ssrCommonDecode(url: str) -> dict or None: -> server:port:protocol:method:obfs:base64(passwd)/?... -> obfsparam=...&protoparam=...&remarks=...&group=... """ - try: - content = re.search(r'^ssr://([\S]+)$', url).group(1) # ssr://{base64} - content = re.search( - r'^([a-zA-Z0-9.:_-]*):([0-9]*):' # server:p/r - r'([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):' # protocol:method:obfs: - r'([0-9a-zA-Z_=+\\-]*)(/\?)?([\S]*)?$', # base(passwd)/?... - baseFunc.base64Decode(content) - ) - info = { - 'server': content.group(1), - 'port': int(content.group(2)), - 'passwd': baseFunc.base64Decode(content.group(6)), - 'method': content.group(4), - 'protocol': content.group(3), - 'obfs': content.group(5), - } - for field in content.group(8).split('&'): # /?obfsparam=...&protoparam=...&remarks=...&group=... - if field.find('=') == -1: # 缺失xxx=... - continue - field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... - if field.group(1) == 'protoparam': - info['protocolParam'] = baseFunc.base64Decode(field.group(2)) - elif field.group(1) == 'obfsparam': - info['obfsParam'] = baseFunc.base64Decode(field.group(2)) - elif field.group(1) == 'remarks': - info['remark'] = baseFunc.base64Decode(field.group(2)) - elif field.group(1) == 'group': - info['group'] = baseFunc.base64Decode(field.group(2)) - return info - except: - return None + content = re.search(r'^ssr://([\S]+)$', url).group(1) # ssr://{base64} + content = re.search( + r'^([a-zA-Z0-9.:_-]*):([0-9]*):' # server:p/r + r'([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):' # protocol:method:obfs: + r'([0-9a-zA-Z_=+\\-]*)(/\?)?([\S]*)?$', # base(passwd)/?... + baseFunc.base64Decode(content) + ) + info = { + 'server': content.group(1), + 'port': int(content.group(2)), + 'passwd': baseFunc.base64Decode(content.group(6)), + 'method': content.group(4), + 'protocol': content.group(3), + 'obfs': content.group(5), + } + for field in content.group(8).split('&'): # /?obfsparam=...&protoparam=...&remarks=...&group=... + if field.find('=') == -1: # 缺失xxx=... + continue + field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... + if field.group(1) == 'protoparam': + info['protocolParam'] = baseFunc.base64Decode(field.group(2)) + elif field.group(1) == 'obfsparam': + info['obfsParam'] = baseFunc.base64Decode(field.group(2)) + elif field.group(1) == 'remarks': + info['remark'] = baseFunc.base64Decode(field.group(2)) + elif field.group(1) == 'group': + info['group'] = baseFunc.base64Decode(field.group(2)) + return info def ssrDecode(url: str) -> dict or None: """ @@ -60,13 +57,11 @@ def ssrDecode(url: str) -> dict or None: 链接不合法: return None """ + if url[0:6] != 'ssr://': + return None try: - if url[0:6] != 'ssr://': - return None result = __ssrCommonDecode(url) # try common decode - if result is not None: # 解析成功 - result['type'] = 'ssr' - return result except: - pass - return None + return None + result['type'] = 'ssr' + return result diff --git a/ProxyDecoder/VMess.py b/ProxyDecoder/VMess.py new file mode 100644 index 0000000..542f145 --- /dev/null +++ b/ProxyDecoder/VMess.py @@ -0,0 +1,142 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- + +import re +import json +from ProxyDecoder import baseFunc + +def __splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数 + if path.find('?') == -1: + return None, path + content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path) + ed = None + params = [] + for field in content[2].split('&'): # ?param_a=...¶m_b=... + if not field.startswith('ed='): + params.append(field) + continue + ed = int(field[3:]) # ed=... + if ed is None: # ed param not found + return None, path + if not params: # param -> [] + return ed, content[1] + return ed, content[1] + '?' + '&'.join(params) + +def __vmessV2raynDecode(url: str) -> dict: + """ + v2rayN / v2rayNG分享链接解码 + + FORMAT: vmess://BASE64-ENCODED-JSON-STRING + + { + "v": "2", + "ps": "...", + "add": "...", + "port": "...", + "id": "...", + "aid": "...", + "scy": "...", + "net": "...", + "type": "...", + "host": "...", + "path": "...", + "tls": "...", + "sni": "...", + "alpn": "..." + } + """ + content = json.loads( + baseFunc.base64Decode( + re.search(r'^vmess://([\S]+)$', url).group(1) # vmess://{base64} + ) + ) + if int(content['v']) != 2: # version => 2 + raise Exception('Unknown version field') + info = { + 'server': content['add'], + 'port': int(content['port']), + 'id': content['id'], + 'aid': int(content['aid']), + } + if 'ps' in content: # ps -> remark + info['remark'] = content['ps'] + if 'scy' in content: # scy -> method + info['method'] = content['scy'] + stream = { + 'type': content['net'] # net -> stream.type + } + if content['net'] == 'tcp': + if 'http' in content and content['type'] == 'http': # type -> none / http + stream['obfs'] = { + 'host': content['host'], + 'path': content['path'] + } + elif content['net'] == 'kcp': + if 'type' in content: + stream['obfs'] = content['type'] + if 'path' in content: + stream['seed'] = content['path'] # path -> seed + elif content['net'] == 'ws': + if 'host' in content: + stream['host'] = content['host'] + if 'path' in content: + try: + stream['ed'], stream['path'] = __splitEdParam(content['path']) + except: + stream['path'] = content['path'] + elif content['net'] == 'h2': + if 'host' in content: + stream['host'] = content['host'] + if 'path' in content: + stream['path'] = content['path'] + elif content['net'] == 'quic': + if 'type' in content: + stream['obfs'] = content['type'] + if 'host' in content: + stream['method'] = content['host'] + if 'path' in content: + stream['passwd'] = content['path'] + elif content['net'] == 'grpc': + if 'type' in content and content['type'] == 'multi': + stream['mode'] = 'multi' + if 'path' in content: + stream['service'] = content['path'] + else: + raise Exception('Unknown network type') + + secure = None + if 'tls' in content and content['tls'] == 'tls': # enable TLS + secure = {} + if 'sni' in content: + secure['sni'] = content['sni'] # sni option + if 'alpn' in content: + if content['alpn'] == '': + secure['alpn'] = None # ignore alpn option + else: + secure['alpn'] = content['alpn'] # h2 | http/1.1 | h2,http/1.1 + + stream['secure'] = secure + info['stream'] = stream + return info + +def vmessDecode(url: str) -> dict or None: + """ + VMess分享链接解码 + + 链接合法: + return { + 'type': 'vmess', + ... + } + + 链接不合法: + return None + """ + if url[0:8] != 'vmess://': + return None + try: + result = __vmessV2raynDecode(url) # try v2rayN decode + except: + return None + result['type'] = 'vmess' + return result diff --git a/ProxyDecoder/decoder.py b/ProxyDecoder/decoder.py index 7b59243..5084912 100644 --- a/ProxyDecoder/decoder.py +++ b/ProxyDecoder/decoder.py @@ -4,6 +4,7 @@ import re from ProxyDecoder import Shadowsocks from ProxyDecoder import ShadowsocksR +from ProxyDecoder import VMess def decode(url: str) -> dict or None: """ @@ -22,9 +23,11 @@ def decode(url: str) -> dict or None: try: scheme = re.search(r'^([\S]+?)://([\s\S]+)$', url).group(1) if scheme == 'ss': - return Shadowsocks.ssDecode(url) + return Shadowsocks.ssDecode(url, compatible = True) elif scheme == 'ssr': return ShadowsocksR.ssrDecode(url) + elif scheme == 'vmess': + return VMess.vmessDecode(url) except: pass return None