Browse Source

update: optimize the decoder

master
Dnomd343 3 years ago
parent
commit
185a6e0c37
  1. 7
      ProxyDecoder/Brook.py
  2. 39
      ProxyDecoder/Shadowsocks.py
  3. 28
      ProxyDecoder/ShadowsocksR.py
  4. 35
      ProxyDecoder/Trojan.py
  5. 26
      ProxyDecoder/TrojanGo.py
  6. 35
      ProxyDecoder/VLESS.py
  7. 36
      ProxyDecoder/VMess.py
  8. 11
      ProxyDecoder/baseFunc.py
  9. 16
      ProxyDecoder/decoder.py
  10. 44
      demo.py

7
ProxyDecoder/Brook.py

@ -1,28 +1,27 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import re
from ProxyDecoder import baseFunc
def __addressSplit(address: str) -> dict: # server:port
if address == '':
return {}
server, port = address.rsplit(':', maxsplit=1)
server, port = address.rsplit(':', maxsplit = 1)
return {
'server': baseFunc.formatHost(server),
'port': int(port)
}
def __wsSplit(wsServer: str, params: dict) -> dict:
def __wsSplit(wsServer: str, params: dict) -> dict: # ws[s]://xxx:xxx/...
wsUrl = baseFunc.urlSplit(wsServer)
wsInfo = {
'server': wsUrl['server'],
'port': wsUrl['port'],
'ws': {
'host': wsUrl['server'],
'path': wsUrl['path'] if wsUrl['path'] != '' else '/ws'
'path': wsUrl['path'] if wsUrl['path'] != '' else '/ws' # path as `/ws` when omitted
}
}
if 'address' not in params:

39
ProxyDecoder/Shadowsocks.py

@ -4,6 +4,7 @@
import re
from ProxyDecoder import baseFunc
def __ssPlainDecode(url: str) -> dict:
"""
Shadowsocks原始分享链接解码
@ -31,6 +32,7 @@ def __ssPlainDecode(url: str) -> dict:
'remark': remark
}
def __ssCommonDecode(url: str) -> dict:
"""
Shadowsocks经典分享链接解码
@ -58,6 +60,7 @@ def __ssCommonDecode(url: str) -> dict:
'remark': remark
}
def __sip002Decode(url: str) -> dict:
"""
Shadowsocks SIP002分享链接解码
@ -116,32 +119,24 @@ def __sip002Decode(url: str) -> dict:
info['plugin'] = pluginObject
return info
def ssDecode(url: str, compatible: bool = False) -> dict or None:
"""
Shadowsocks分享链接解码
链接合法:
return {
'type': 'ss',
...
}
链接不合法:
return None
"""
if url[0:5] != 'ss://':
return None
def decode(url: str, compatible: bool = False) -> dict or None:
if url.split('://')[0] != 'ss':
raise Exception('Unexpected scheme')
try:
result = __ssCommonDecode(url) # try shadowsocks common decode
ssInfo = __ssCommonDecode(url) # try shadowsocks common decode
except:
try:
result = __sip002Decode(url) # try shadowsocks sip002 decode
ssInfo = __sip002Decode(url) # try shadowsocks sip002 decode
except:
try:
result = __ssPlainDecode(url) # try shadowsocks plain decode
ssInfo = __ssPlainDecode(url) # try shadowsocks plain decode
except:
return None
if compatible and 'remark' in result: # 向后兼容部分客户端
result['remark'] = result['remark'].replace('+', ' ')
result['type'] = 'ss'
return result
raise Exception('Url could not be parsed')
if compatible and 'remark' in ssInfo: # 向后兼容部分客户端
ssInfo['remark'] = ssInfo['remark'].replace('+', ' ')
return {
**{'type': 'ss'},
**ssInfo
}

28
ProxyDecoder/ShadowsocksR.py

@ -4,10 +4,9 @@
import re
from ProxyDecoder import baseFunc
def __ssrCommonDecode(url: str) -> dict:
"""
ShadowsocksR经典分享链接解码
def __ssrDecode(url: str) -> dict: # SSR分享链接解码
"""
FORMAT: ssr://BASE64-ENCODED-STRING-WITHOUT-PADDING
EXAMPLE:
@ -41,24 +40,11 @@ def __ssrCommonDecode(url: str) -> dict:
info['group'] = baseFunc.base64Decode(params['group'])
return info
def ssrDecode(url: str) -> dict or None:
"""
ShadowsocksR分享链接解码
链接合法:
def decode(url: str) -> dict:
if url.split('://')[0] != 'ssr':
raise Exception('Unexpected scheme')
return {
'type': 'ssr',
...
**{'type': 'ssr'},
**__ssrDecode(url)
}
链接不合法:
return None
"""
if url[0:6] != 'ssr://':
return None
try:
result = __ssrCommonDecode(url) # try common decode
except:
return None
result['type'] = 'ssr'
return result

35
ProxyDecoder/Trojan.py

@ -4,7 +4,8 @@
import re
from ProxyDecoder import baseFunc
def __trojanCommonDecode(url: str) -> dict:
def __trojanDecode(url: str) -> dict:
"""
Trojan标准分享链接解码
@ -57,7 +58,7 @@ def __trojanCommonDecode(url: str) -> dict:
if 'type' not in params:
params['type'] = 'tcp' # default -> tcp
stream = {
'type': params['type']
'type': params['type'] if params['type'] != 'http' else 'h2'
}
if params['type'] == 'tcp':
if 'headerType' in params and params['headerType'] == 'http':
@ -100,8 +101,7 @@ def __trojanCommonDecode(url: str) -> dict:
raise Exception('Unknown network type')
if 'security' in params:
if params['security'] not in ['tls', 'xtls']:
raise Exception('Unknown security type')
if params['security'] in ['tls', 'xtls']:
secure = {
'type': params['security']
}
@ -116,28 +116,19 @@ def __trojanCommonDecode(url: str) -> dict:
secure['flow'] = 'xtls-direct'
elif params['flow'] in ['xtls-rprx-splice', 'xtls-rprx-splice-udp443']:
secure['flow'] = 'xtls-splice'
elif params['security'] in ['', 'none']:
secure = None
else:
raise Exception('Unknown security type')
stream['secure'] = secure
info['stream'] = stream
return info
def trojanDecode(url: str) -> dict or None:
"""
Trojan分享链接解码
链接合法:
def decode(url: str) -> dict:
if url.split('://')[0] != 'trojan':
raise Exception('Unexpected scheme')
return {
'type': 'trojan',
...
**{'type': 'trojan'},
**__trojanDecode(url)
}
链接不合法:
return None
"""
if url[0:9] != 'trojan://':
return None
try:
result = __trojanCommonDecode(url) # try Trojan common decode
except:
return None
result['type'] = 'trojan'
return result

26
ProxyDecoder/TrojanGo.py

@ -4,7 +4,7 @@
import re
from ProxyDecoder import baseFunc
def __trojanGoCommonDecode(url: str) -> dict:
def __trojanGoDecode(url: str) -> dict:
"""
Trojan-Go标准分享链接解码
@ -73,24 +73,10 @@ def __trojanGoCommonDecode(url: str) -> dict:
}
return info
def trojanGoDecode(url: str) -> dict or None:
"""
Trojan-Go分享链接解码
链接合法:
def decode(url: str) -> dict:
if url.split('://')[0] != 'trojan-go':
raise Exception('Unexpected scheme')
return {
'type': 'trojan-go',
...
**{'type': 'trojan-go'},
**__trojanGoDecode(url)
}
链接不合法:
return None
"""
if url[0:12] != 'trojan-go://':
return None
try:
result = __trojanGoCommonDecode(url) # try Trojan-Go common decode
except:
return None
result['type'] = 'trojan-go'
return result

35
ProxyDecoder/VLESS.py

@ -4,7 +4,8 @@
import re
from ProxyDecoder import baseFunc
def __vlessCommonDecode(url: str) -> dict:
def __vlessDecode(url: str) -> dict:
"""
VLESS标准分享链接解码
@ -55,7 +56,7 @@ def __vlessCommonDecode(url: str) -> dict:
}
params = baseFunc.paramSplit(match[4])
stream = {
'type': params['type']
'type': params['type'] if params['type'] != 'http' else 'h2'
}
if params['type'] == 'tcp':
if 'headerType' in params and params['headerType'] == 'http':
@ -98,8 +99,7 @@ def __vlessCommonDecode(url: str) -> dict:
raise Exception('Unknown network type')
if 'security' in params:
if params['security'] not in ['tls', 'xtls']:
raise Exception('Unknown security type')
if params['security'] in ['tls', 'xtls']:
secure = {
'type': params['security']
}
@ -114,28 +114,19 @@ def __vlessCommonDecode(url: str) -> dict:
secure['flow'] = 'xtls-direct'
elif params['flow'] in ['xtls-rprx-splice', 'xtls-rprx-splice-udp443']:
secure['flow'] = 'xtls-splice'
elif params['security'] in ['', 'none']:
secure = None
else:
raise Exception('Unknown security type')
stream['secure'] = secure
info['stream'] = stream
return info
def vlessDecode(url: str) -> dict or None:
"""
VLESS分享链接解码
链接合法:
def decode(url: str) -> dict:
if url.split('://')[0] != 'vless':
raise Exception('Unexpected scheme')
return {
'type': 'vless',
...
**{'type': 'vless'},
**__vlessDecode(url)
}
链接不合法:
return None
"""
if url[0:8] != 'vless://':
return None
try:
result = __vlessCommonDecode(url) # try VLESS common decode
except:
return None
result['type'] = 'vless'
return result

36
ProxyDecoder/VMess.py

@ -5,6 +5,7 @@ import re
import json
from ProxyDecoder import baseFunc
def __vmessV2raynDecode(url: str) -> dict:
"""
v2rayN / v2rayNG分享链接解码
@ -27,6 +28,7 @@ def __vmessV2raynDecode(url: str) -> dict:
"sni": "...",
"alpn": "..."
}
"""
content = json.loads(
baseFunc.base64Decode(
@ -36,7 +38,7 @@ def __vmessV2raynDecode(url: str) -> dict:
if int(content['v']) != 2: # version => 2
raise Exception('Unknown version field')
info = {
'server': content['add'],
'server': baseFunc.formatHost(content['add']),
'port': int(content['port']),
'id': content['id'],
'aid': int(content['aid']),
@ -102,6 +104,7 @@ def __vmessV2raynDecode(url: str) -> dict:
info['stream'] = stream
return info
def __vmessCommonDecode(url: str) -> dict:
"""
VMess标准分享链接解码 (only VMessAEAD)
@ -153,7 +156,7 @@ def __vmessCommonDecode(url: str) -> dict:
if 'encryption' in params:
info['method'] = params['encryption']
stream = {
'type': params['type']
'type': params['type'] if params['type'] != 'http' else 'h2'
}
if params['type'] == 'tcp':
if 'headerType' in params and params['headerType'] == 'http':
@ -207,27 +210,18 @@ def __vmessCommonDecode(url: str) -> dict:
info['stream'] = stream
return info
def vmessDecode(url: str) -> dict or None:
"""
VMess分享链接解码
链接合法:
return {
'type': 'vmess',
...
}
链接不合法:
return None
"""
if url[0:8] != 'vmess://':
return None
def decode(url: str) -> dict:
if url.split('://')[0] != 'vmess':
raise Exception('Unexpected scheme')
try:
result = __vmessV2raynDecode(url) # try v2rayN decode
vmessInfo = __vmessV2raynDecode(url) # try v2rayN decode
except:
try:
result = __vmessCommonDecode(url) # try VMess common decode
vmessInfo = __vmessCommonDecode(url) # try VMess common decode
except:
return None
result['type'] = 'vmess'
return result
raise Exception('Url could not be parsed')
return {
**{'type': 'vmess'},
**vmessInfo
}

11
ProxyDecoder/baseFunc.py

@ -5,14 +5,15 @@ import re
import base64
import urllib.parse
def urlEncode(content: str) -> str or None:
def urlEncode(content: str) -> str or None: # url-encode
try:
return urllib.parse.urlencode(content)
except:
return None
def urlDecode(content: str) -> str or None:
def urlDecode(content: str) -> str or None: # url-decode
try:
return urllib.parse.unquote(content)
except:
@ -42,7 +43,7 @@ def base64Decode(content: str) -> str or None:
return None
def formatHost(host: str) -> str:
def formatHost(host: str) -> str: # host -> IP / Domain
try:
host = host.lower().strip()
if host[:1] == '[' and host[-1:] == ']': # [IPv6]
@ -64,7 +65,7 @@ def paramSplit(paramStr: str) -> dict: # ?param_1=xxx&param_2=xxx&param_3=xxx
return params
def splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数
def splitEdParam(path: str) -> tuple[int or None, str]: # split early-data option
if path.find('?') == -1:
return None, path
content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path)
@ -86,13 +87,11 @@ def urlSplit(url: str) -> dict: # scheme://[auth@]server[:port]/.../...?param_1=
url = urllib.parse.urlparse(url)
auth = port = None
netloc = url[1]
if not netloc.find(':') < 0: # server[:port]
netloc, port = netloc.rsplit(':', maxsplit = 1)
port = int(port)
if not netloc.find('@') < 0: # [auth@]server
auth, netloc = netloc.rsplit('@', maxsplit = 1)
return {
'scheme': url[0],
'auth': auth,

16
ProxyDecoder/decoder.py

@ -1,7 +1,6 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import re
from ProxyDecoder import Shadowsocks
from ProxyDecoder import ShadowsocksR
from ProxyDecoder import VMess
@ -10,6 +9,7 @@ from ProxyDecoder import Trojan
from ProxyDecoder import TrojanGo
from ProxyDecoder import Brook
def decode(url: str) -> dict or None:
"""
代理分享链接解码
@ -24,19 +24,19 @@ def decode(url: str) -> dict or None:
}
"""
try:
scheme = re.search(r'^([\S]+?)://([\s\S]+)$', url).group(1)
scheme = url.split('://', maxsplit = 1)[0]
if scheme == 'ss':
return Shadowsocks.ssDecode(url, compatible = True)
return Shadowsocks.decode(url, compatible = True)
elif scheme == 'ssr':
return ShadowsocksR.ssrDecode(url)
return ShadowsocksR.decode(url)
elif scheme == 'vmess':
return VMess.vmessDecode(url)
return VMess.decode(url)
elif scheme == 'vless':
return VLESS.vlessDecode(url)
return VLESS.decode(url)
elif scheme == 'trojan':
return Trojan.trojanDecode(url)
return Trojan.decode(url)
elif scheme == 'trojan-go':
return TrojanGo.trojanGoDecode(url)
return TrojanGo.decode(url)
elif scheme == 'brook':
return Brook.decode(url)
except:

44
demo.py

@ -5,42 +5,16 @@ import ProxyDecoder as Decoder
import ProxyFilter as Filter
import Check as Checker
# info = {
# 'type': 'brook',
# 'server': '127.0.0.1',
# 'port': '12345',
# 'passwd': 'dnomd343',
# # 'ws': {
# # 'host': 'local.343.re',
# # 'path': '/test',
# # 'secure': {
# # 'verify': False
# # }
# # }
# }
#
# status, ret = Filter.filte(info)
# print(status)
# print(ret)
#
# # print()
# # status, ret = Builder.build(ret, '/tmp/ProxyC')
# # print(status)
# # print(ret)
#
# data = Checker.proxyTest({
# 'check': ['http'],
# 'info': ret
# })
# print(data)
# url = 'brook://server?address=&insecure=&name=&password=password&server=1.2.3.4%3A9999&username='
# url = 'brook://server?address=&insecure=&name=&password=password&server=%5B2001%3A4860%3A4860%3A%3A8888%5D%3A9999&username='
# url = 'brook://wsserver?address=&insecure=&name=&password=password&username=&wsserver=ws%3A%2F%2F1.2.3.4%3A9999'
# url = 'brook://wsserver?address=&insecure=&name=&password=password&username=&wsserver=ws%3A%2F%2F%5B2001%3A4860%3A4860%3A%3A8888%5D%3A9999'
# url = 'brook://wssserver?address=1.2.3.4%3A443&insecure=true&name=&password=password&username=&wssserver=wss%3A%2F%2Fhello.com%3A443'
# url = 'brook://wsserver?address=1.2.3.4%3A443&name=&password=password&username=&wsserver=ws%3A%2F%2Fhello.com%3A443'
url = 'brook://server?address=&insecure=&name=&password=password&server=1.2.3.4%3A9999&username='
url = 'brook://server?address=&insecure=&name=&password=password&server=%5B2001%3A4860%3A4860%3A%3A8888%5D%3A9999&username='
url = 'brook://wsserver?address=&insecure=&name=&password=password&username=&wsserver=ws%3A%2F%2F1.2.3.4%3A9999'
url = 'brook://wsserver?address=&insecure=&name=&password=password&username=&wsserver=ws%3A%2F%2F%5B2001%3A4860%3A4860%3A%3A8888%5D%3A9999'
url = 'brook://wssserver?address=1.2.3.4%3A443&insecure=true&name=&password=password&username=&wssserver=wss%3A%2F%2Fhello.com%3A443'
url = 'brook://wsserver?address=1.2.3.4%3A443&name=&password=password&username=&wsserver=ws%3A%2F%2Fhello.com%3A443'
# url = 'ss://bf-cfb:test@192.168.100.1:8888#EXAMPLE'
# url = 'ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server'
url = 'ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example'
ret = Decoder.decode(url)
print(ret)

Loading…
Cancel
Save