From 5b320f7c4ccc90f67e31c88b2c67f06266188d7b Mon Sep 17 00:00:00 2001 From: Dnomd343 Date: Thu, 3 Mar 2022 18:19:18 +0800 Subject: [PATCH] feat: decode of Brook link --- ProxyDecoder/Brook.py | 87 ++++++++++++++++++++++++++++++++++++++++ ProxyDecoder/baseFunc.py | 61 ++++++++++++++++++++-------- ProxyDecoder/decoder.py | 6 ++- demo.py | 65 ++++++++++++++++++------------ 4 files changed, 175 insertions(+), 44 deletions(-) create mode 100644 ProxyDecoder/Brook.py diff --git a/ProxyDecoder/Brook.py b/ProxyDecoder/Brook.py new file mode 100644 index 0000000..7b76ef0 --- /dev/null +++ b/ProxyDecoder/Brook.py @@ -0,0 +1,87 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- + +import re +from ProxyDecoder import baseFunc + + +def __addressSplit(address: str) -> dict: # server:port + if address == '': + return {} + server, port = address.rsplit(':', maxsplit=1) + return { + 'server': baseFunc.formatHost(server), + 'port': int(port) + } + + +def __wsSplit(wsServer: str, params: dict) -> dict: + wsUrl = baseFunc.urlSplit(wsServer) + wsInfo = { + 'server': wsUrl['server'], + 'port': wsUrl['port'], + 'ws': { + 'host': wsUrl['server'], + 'path': wsUrl['path'] if wsUrl['path'] != '' else '/ws' + } + } + if 'address' not in params: + return wsInfo + return { + **wsInfo, + **__addressSplit(params['address']) # overwrite server and port + } + + +def __brookDecode(url: str) -> dict: # Brook分享链接解码 + """ + Docs: https://txthinking.github.io/brook/#/brook-link + """ + url = baseFunc.urlSplit(url) # brook://KIND?QUERY + brookKind = url['server'] + if brookKind not in ['server', 'wsserver', 'wssserver']: # skip socks5 + raise Exception('Unknown brook kind') + + if 'password' not in url['params']: + raise Exception('Miss password option') + brookInfo = { + 'passwd': url['params']['password'], + 'remark': url['params']['name'] if 'name' in url['params'] else '' + } + + if brookKind == 'server': # server mode + if 'server' not in url['params']: + raise Exception('Miss server option') + return { + **brookInfo, + **__addressSplit(url['params']['server']) + } + + if brookKind == 'wsserver': # ws server mode + if 'wsserver' not in url['params']: + raise Exception('Miss wsserver option') + return { + **brookInfo, + **__wsSplit(url['params']['wsserver'], url['params']) + } + + if brookKind == 'wssserver': # wss server mode + if 'wssserver' not in url['params']: + raise Exception('Miss wssserver option') + brookInfo = { + **brookInfo, + **__wsSplit(url['params']['wssserver'], url['params']) + } + brookInfo['ws']['secure'] = {} + if 'insecure' in url['params'] and url['params']['insecure'] == 'true': + brookInfo['ws']['secure']['verify'] = False + return brookInfo + + +def decode(url: str) -> dict: + if url.split('://')[0] != 'brook': + raise Exception('Unexpected scheme') + return { + **{'type': 'brook'}, + **__brookDecode(url) + } diff --git a/ProxyDecoder/baseFunc.py b/ProxyDecoder/baseFunc.py index 24d1d67..3cc2d6d 100644 --- a/ProxyDecoder/baseFunc.py +++ b/ProxyDecoder/baseFunc.py @@ -11,12 +11,14 @@ def urlEncode(content: str) -> str or None: except: return None + def urlDecode(content: str) -> str or None: try: return urllib.parse.unquote(content) except: return None + def base64Encode(content: str, urlSafe: bool = False, isPadding: bool = True) -> str or None: try: content = base64.b64encode(content.encode()).decode() @@ -29,6 +31,7 @@ def base64Encode(content: str, urlSafe: bool = False, isPadding: bool = True) -> except: return None + def base64Decode(content: str) -> str or None: try: content = content.replace('-', '+').replace('_', '/') @@ -38,26 +41,28 @@ def base64Decode(content: str) -> str or None: except: return None -def formatHost(content: str) -> str: + +def formatHost(host: str) -> str: try: - content = content.lower().strip() - if content[:1] == '[' and content[-1:] == ']': - return content[1:-1] + host = host.lower().strip() + if host[:1] == '[' and host[-1:] == ']': # [IPv6] + return host[1:-1] except: pass - return content - -def paramSplit(content: str) -> dict: - if content.startswith('?'): - content = content[1:] - result = {} - for field in content.split('&'): - match = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... - try: - result[urlDecode(match[1])] = urlDecode(match[2]) - except: - pass - return result + return host + + +def paramSplit(paramStr: str) -> dict: # ?param_1=xxx¶m_2=xxx¶m_3=xxx + if paramStr.startswith('?'): + paramStr = paramStr[1:] # remove `?` char + params = {} + for field in paramStr.split('&'): + if field.find('=') < 0: # without `=` char + continue + key, value = field.split('=', maxsplit = 1) + params[key] = urlDecode(value) + return params + def splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数 if path.find('?') == -1: @@ -75,3 +80,25 @@ def splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数 if not params: # param -> [] return ed, content[1] return ed, content[1] + '?' + '&'.join(params) + + +def urlSplit(url: str) -> dict: # scheme://[auth@]server[:port]/.../...?param_1=...¶m_2=...#remark + url = urllib.parse.urlparse(url) + auth = port = None + netloc = url[1] + + if not netloc.find(':') < 0: # server[:port] + netloc, port = netloc.rsplit(':', maxsplit = 1) + port = int(port) + if not netloc.find('@') < 0: # [auth@]server + auth, netloc = netloc.rsplit('@', maxsplit = 1) + + return { + 'scheme': url[0], + 'auth': auth, + 'server': formatHost(netloc), + 'port': port, + 'path': url[2], + 'params': paramSplit(url[4]), + 'remark': urlDecode(url[5]) + } diff --git a/ProxyDecoder/decoder.py b/ProxyDecoder/decoder.py index 9423775..ea17e23 100644 --- a/ProxyDecoder/decoder.py +++ b/ProxyDecoder/decoder.py @@ -8,6 +8,7 @@ from ProxyDecoder import VMess from ProxyDecoder import VLESS from ProxyDecoder import Trojan from ProxyDecoder import TrojanGo +from ProxyDecoder import Brook def decode(url: str) -> dict or None: """ @@ -18,8 +19,7 @@ def decode(url: str) -> dict or None: 链接有效: return { - '...': '...', - '...': '...', + 'type': ..., ... } """ @@ -37,6 +37,8 @@ def decode(url: str) -> dict or None: return Trojan.trojanDecode(url) elif scheme == 'trojan-go': return TrojanGo.trojanGoDecode(url) + elif scheme == 'brook': + return Brook.decode(url) except: pass return None diff --git a/demo.py b/demo.py index ebf6a6b..114450f 100644 --- a/demo.py +++ b/demo.py @@ -5,31 +5,46 @@ import ProxyDecoder as Decoder import ProxyFilter as Filter import Check as Checker -info = { - 'type': 'brook', - 'server': '127.0.0.1', - 'port': '12345', - 'passwd': 'dnomd343', - # 'ws': { - # 'host': 'local.343.re', - # 'path': '/test', - # 'secure': { - # 'verify': False - # } - # } -} - -status, ret = Filter.filte(info) -print(status) -print(ret) - -# print() -# status, ret = Builder.build(ret, '/tmp/ProxyC') +# info = { +# 'type': 'brook', +# 'server': '127.0.0.1', +# 'port': '12345', +# 'passwd': 'dnomd343', +# # 'ws': { +# # 'host': 'local.343.re', +# # 'path': '/test', +# # 'secure': { +# # 'verify': False +# # } +# # } +# } +# +# status, ret = Filter.filte(info) # print(status) # print(ret) +# +# # print() +# # status, ret = Builder.build(ret, '/tmp/ProxyC') +# # print(status) +# # print(ret) +# +# data = Checker.proxyTest({ +# 'check': ['http'], +# 'info': ret +# }) +# print(data) + -data = Checker.proxyTest({ - 'check': ['http'], - 'info': ret -}) -print(data) +url = 'brook://server?address=&insecure=&name=&password=password&server=1.2.3.4%3A9999&username=' +url = 'brook://server?address=&insecure=&name=&password=password&server=%5B2001%3A4860%3A4860%3A%3A8888%5D%3A9999&username=' +url = 'brook://wsserver?address=&insecure=&name=&password=password&username=&wsserver=ws%3A%2F%2F1.2.3.4%3A9999' +url = 'brook://wsserver?address=&insecure=&name=&password=password&username=&wsserver=ws%3A%2F%2F%5B2001%3A4860%3A4860%3A%3A8888%5D%3A9999' +url = 'brook://wssserver?address=1.2.3.4%3A443&insecure=true&name=&password=password&username=&wssserver=wss%3A%2F%2Fhello.com%3A443' +url = 'brook://wsserver?address=1.2.3.4%3A443&name=&password=password&username=&wsserver=ws%3A%2F%2Fhello.com%3A443' + +ret = Decoder.decode(url) +print(ret) + +status, ret = Filter.filte(ret, isExtra = True) +print(status) +print(ret)