Browse Source

feat: decode VMess common share url

master
Dnomd343 2 years ago
parent
commit
b1016e32ae
  1. 105
      ProxyDecoder/Shadowsocks.py
  2. 45
      ProxyDecoder/ShadowsocksR.py
  3. 115
      ProxyDecoder/VMess.py
  4. 22
      ProxyDecoder/baseFunc.py
  5. 11
      demo.py

105
ProxyDecoder/Shadowsocks.py

@ -13,18 +13,22 @@ def __ssPlainDecode(url: str) -> dict:
EXAMPLE:
ss://bf-cfb:test@192.168.100.1:8888#EXAMPLE
"""
content = re.search(r'^ss://([\s\S]+?)(#[\s\S]*)?$', url) # ...#REMARK
info = re.search(
r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$',
content.group(1) # method:password@server:port
remark = ''
match = re.search(r'^ss://([\S]+)$', url) # ss://...
if match[1].find('#') != -1: # ...#REMARK
match = re.search(r'^([\S]+)#([\S]*)$', match[1])
remark = baseFunc.urlDecode(
match[2] if match[2] is not None else ''
)
match = re.search(
r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)$', match[1] # method:password@server:port
)
remark = content.group(2)[1:] if content.group(2) is not None else ''
return {
'server': info[3],
'port': int(info[4]),
'passwd': info[2],
'method': info[1],
'remark': baseFunc.urlDecode(remark)
'server': baseFunc.formatHost(match[3]),
'port': int(match[4]),
'passwd': match[2],
'method': match[1],
'remark': remark
}
def __ssCommonDecode(url: str) -> dict:
@ -38,17 +42,20 @@ def __ssCommonDecode(url: str) -> dict:
-> YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg
-> ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server
"""
content = re.search(r'^ss://([a-zA-Z0-9_=+\\-]+)#?([\S]*)?$', url) # base64#REMARK
info = re.search(
r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$',
baseFunc.base64Decode(content.group(1)) # method:password@server:port
match = re.search(r'^ss://([a-zA-Z0-9\-_+\\=]+)#?([\S]*)?$', url) # base64#REMARK
remark = baseFunc.urlDecode(
match[2] if match[2] is not None else ''
)
match = re.search(
r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)$',
baseFunc.base64Decode(match[1]) # method:password@server:port
)
return {
'server': info[3],
'port': int(info[4]),
'passwd': info[2],
'method': info[1],
'remark': baseFunc.urlDecode(content.group(2))
'server': baseFunc.formatHost(match[3]),
'port': int(match[4]),
'passwd': match[2],
'method': match[1],
'remark': remark
}
def __sip002Decode(url: str) -> dict:
@ -66,55 +73,47 @@ def __sip002Decode(url: str) -> dict:
=> ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example
"""
content = re.search(
r'^ss://([a-zA-Z0-9_=+\\-]+)@([a-zA-Z0-9.:_-]+):([0-9]+)'
match = re.search(
r'^ss://([a-zA-Z0-9\-_+\\=]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)'
r'/?([\S]*)$', url # base64@server:port/... (/可选)
)
userInfo = re.search(
r'^([\S]+?):([\S]+)$',
baseFunc.base64Decode(content.group(1)) # method:password
baseFunc.base64Decode(match[1]) # method:password
)
info = {
'server': content.group(2),
'port': int(content.group(3)),
'passwd': userInfo.group(2),
'method': userInfo.group(1),
'server': baseFunc.formatHost(match[2]),
'port': int(match[3]),
'passwd': userInfo[2],
'method': userInfo[1],
'remark': ''
}
if content.group(4).find('#') != -1: # ...#REMARK
content = re.search(
r'^\??([\S]*)#([\S]*)$',
content.group(4) # ?...#REMARK (?可选)
)
info['remark'] = baseFunc.urlDecode(
content.group(2)
if match[4].find('#') != -1: # ...#REMARK
match = re.search(
r'^\??([\S]*)#([\S]*)$', match[4] # ?...#REMARK (?可选)
)
info['remark'] = baseFunc.urlDecode(match[2])
else:
content = re.search(
r'^\??([\S]*)$',
content.group(4) # ?... (?可选)
match = re.search(
r'^\??([\S]*)$', match[4] # ?... (?可选)
)
plugin = ''
for field in content.group(1).split('&'): # /?plugin=...&other1=...&other2=...
if field.find('=') == -1: # 缺失xxx=...
continue
field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=...
if field.group(1) == 'plugin':
plugin = baseFunc.urlDecode(field.group(2)) # plugin参数
break
if plugin.find(';') == -1: # plugin=... (无参数)
pluginField = {
'type': plugin,
params = baseFunc.paramSplit(match[1]) # /?plugin=...&other1=...&other2=...
pluginField = params['plugin'] if 'plugin' in params else ''
if pluginField.find(';') == -1: # plugin=... (无参数)
pluginObject = {
'type': pluginField,
'param': ''
}
else: # plugin=...;... (带参数)
plugin = re.search(r'^([\S]*?);([\S]*)$', plugin) # 插件名;插件参数
pluginField = {
'type': plugin.group(1),
'param': plugin.group(2)
match = re.search(r'^([\S]*?);([\S]*)$', pluginField) # 插件名;插件参数
pluginObject = {
'type': match[1],
'param': match[2]
}
if pluginField['type'] != '': # 带插件情况
info['plugin'] = pluginField
if pluginObject['type'] != '': # 带插件时配置
info['plugin'] = pluginObject
return info
def ssDecode(url: str, compatible: bool = False) -> dict or None:

45
ProxyDecoder/ShadowsocksR.py

@ -15,33 +15,30 @@ def __ssrCommonDecode(url: str) -> dict:
-> server:port:protocol:method:obfs:base64(passwd)/?...
-> obfsparam=...&protoparam=...&remarks=...&group=...
"""
content = re.search(r'^ssr://([\S]+)$', url).group(1) # ssr://{base64}
content = re.search(
r'^([a-zA-Z0-9.:_-]*):([0-9]*):' # server:p/r
r'([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):' # protocol:method:obfs:
r'([0-9a-zA-Z_=+\\-]*)(/\?)?([\S]*)?$', # base(passwd)/?...
baseFunc.base64Decode(content)
match = re.search(r'^ssr://([\S]+)$', url) # ssr://{BASE64}
match = re.search(
r'^([a-zA-Z0-9.:\-_\[\]]*):([0-9]*):' # server:port:
r'([0-9a-zA-Z_.\-]*):([0-9a-zA-Z_.\-]*):([0-9a-zA-Z_.\-]*):' # protocol:method:obfs:
r'([a-zA-Z0-9\-_+\\=]*)(/\?)?([\S]*)?$', # BASE64(passwd)/?...
baseFunc.base64Decode(match[1])
)
info = {
'server': content.group(1),
'port': int(content.group(2)),
'passwd': baseFunc.base64Decode(content.group(6)),
'method': content.group(4),
'protocol': content.group(3),
'obfs': content.group(5),
'server': baseFunc.formatHost(match[1]),
'port': int(match[2]),
'passwd': baseFunc.base64Decode(match[6]),
'method': match[4],
'protocol': match[3],
'obfs': match[5],
}
for field in content.group(8).split('&'): # /?obfsparam=...&protoparam=...&remarks=...&group=...
if field.find('=') == -1: # 缺失xxx=...
continue
field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=...
if field.group(1) == 'protoparam':
info['protocolParam'] = baseFunc.base64Decode(field.group(2))
elif field.group(1) == 'obfsparam':
info['obfsParam'] = baseFunc.base64Decode(field.group(2))
elif field.group(1) == 'remarks':
info['remark'] = baseFunc.base64Decode(field.group(2))
elif field.group(1) == 'group':
info['group'] = baseFunc.base64Decode(field.group(2))
params = baseFunc.paramSplit(match[8]) # /?obfsparam=...&protoparam=...&remarks=...&group=...
if 'protoparam' in params:
info['protocolParam'] = baseFunc.base64Decode(params['protoparam'])
if 'obfsparam' in params:
info['obfsParam'] = baseFunc.base64Decode(params['obfsparam'])
if 'remarks' in params:
info['remark'] = baseFunc.base64Decode(params['remarks'])
if 'group' in params:
info['group'] = baseFunc.base64Decode(params['group'])
return info
def ssrDecode(url: str) -> dict or None:

115
ProxyDecoder/VMess.py

@ -119,6 +119,116 @@ def __vmessV2raynDecode(url: str) -> dict:
info['stream'] = stream
return info
def __vmessCommonDecode(url: str) -> dict:
"""
VMess标准分享链接解码 (only VMessAEAD)
FORMAT: vmess://$(UUID)@server:port?{fields}#$(remark)
type -> tcp / kcp / ws / http / quic / grpc
encryption -> auto / aes-128-gcm / chacha20-poly1305
security -> none / tls
path -> WebSocket / HTTP/2
host -> WebSocket / HTTP/2
headerType -> mKCP / QUIC UDP obfs -> none / srtp / utp / wechat-video / dtls / wireguard
seed -> mKCP seed
quicSecurity -> QUIC method
key -> QUIC key
serviceName -> gRPC Service Name
mode -> gRPC transport mode -> gun / multi / guna
sni -> TLS SNI
alpn -> TLS ALPN
"""
match = re.search(r'^vmess://([\S]+?)(#[\S]*)?$', url) # vmess://...#REMARK
remark = baseFunc.urlDecode(
match[2][1:] if match[2] is not None else ''
)
match = re.search(
r'^([\S]+)@([a-zA-Z0-9.:\-_\[\]]+):([0-9]+)/?([\S]*)$', match[1]
)
info = {
'server': baseFunc.formatHost(match[2]),
'port': int(match[3]),
'id': baseFunc.urlDecode(match[1]),
'remark': remark
}
params = baseFunc.paramSplit(match[4])
if 'encryption' in params:
info['method'] = params['encryption']
stream = {
'type': params['type']
}
"""
?httpObfs
path -> WebSocket / HTTP/2
host -> WebSocket / HTTP/2
"""
if params['type'] == 'tcp':
if 'headerType' in params and params['headerType']:
stream['obfs'] = {}
if 'host' in params:
stream['obfs']['host'] = params['host']
if 'path' in params:
stream['obfs']['path'] = params['path']
elif params['type'] == 'kcp':
if 'headerType' in params:
stream['obfs'] = params['headerType']
if 'seed' in params:
stream['seed'] = params['seed']
elif params['type'] == 'ws':
if 'host' in params:
stream['host'] = params['host']
if 'path' in params:
try:
stream['ed'], stream['path'] = __splitEdParam(params['path'])
except:
stream['path'] = params['path']
elif params['type'] == 'http':
if 'host' in params:
stream['host'] = params['host']
if 'path' in params:
stream['path'] = params['path']
elif params['type'] == 'quic':
if 'headerType' in params:
stream['obfs'] = params['headerType']
if 'quicSecurity' in params:
stream['method'] = params['quicSecurity']
if 'key' in params:
stream['passwd'] = params['key']
elif params['type'] == 'grpc':
if 'serviceName' in params:
stream['service'] = params['serviceName']
if 'mode' in params and params['mode'] == 'multi':
stream['mode'] = 'multi'
else:
raise Exception('Unknown network type')
secure = None
if 'security' in params and params['security'] == 'tls':
secure = {}
if 'sni' in params:
secure['sni'] = params['sni']
if 'alpn' in params:
secure['alpn'] = params['alpn']
stream['secure'] = secure
info['stream'] = stream
return info
def vmessDecode(url: str) -> dict or None:
"""
VMess分享链接解码
@ -137,6 +247,9 @@ def vmessDecode(url: str) -> dict or None:
try:
result = __vmessV2raynDecode(url) # try v2rayN decode
except:
return None
try:
result = __vmessCommonDecode(url) # try VMess common decode
except:
return None
result['type'] = 'vmess'
return result

22
ProxyDecoder/baseFunc.py

@ -1,6 +1,7 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import re
import base64
import urllib.parse
@ -36,3 +37,24 @@ def base64Decode(content: str) -> str or None:
return base64.b64decode(content).decode()
except:
return None
def formatHost(content: str) -> str:
try:
content = content.lower().strip()
if content[:1] == '[' and content[-1:] == ']':
return content[1:-1]
except:
pass
return content
def paramSplit(content: str) -> dict:
if content.startswith('?'):
content = content[1:]
result = {}
for field in content.split('&'):
match = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=...
try:
result[urlDecode(match[1])] = urlDecode(match[2])
except:
pass
return result

11
demo.py

@ -0,0 +1,11 @@
import ProxyDecoder as Decoder
import ProxyFilter as Filter
url = '...'
ret = Decoder.decode(url)
print(ret)
status, ret = Filter.filte(ret, isExtra = True)
print(status)
print(ret)
Loading…
Cancel
Save