Browse Source

feat: support v2rayN share url

master
Dnomd343 3 years ago
parent
commit
f7cc7c860a
  1. 186
      ProxyDecoder/Shadowsocks.py
  2. 73
      ProxyDecoder/ShadowsocksR.py
  3. 142
      ProxyDecoder/VMess.py
  4. 5
      ProxyDecoder/decoder.py

186
ProxyDecoder/Shadowsocks.py

@ -4,7 +4,7 @@
import re import re
from ProxyDecoder import baseFunc from ProxyDecoder import baseFunc
def __ssPlainDecode(url: str) -> dict or None: def __ssPlainDecode(url: str) -> dict:
""" """
Shadowsocks原始分享链接解码 Shadowsocks原始分享链接解码
@ -13,25 +13,21 @@ def __ssPlainDecode(url: str) -> dict or None:
EXAMPLE: EXAMPLE:
ss://bf-cfb:test@192.168.100.1:8888#EXAMPLE ss://bf-cfb:test@192.168.100.1:8888#EXAMPLE
""" """
try: content = re.search(r'^ss://([\s\S]+?)(#[\s\S]*)?$', url) # ...#REMARK
content = re.search(r'^ss://([\s\S]+?)(#[\s\S]*)?$', url) # ...#REMARK info = re.search(
info = re.search( r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$',
r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', content.group(1) # method:password@server:port
content.group(1) # method:password@server:port )
) remark = content.group(2)[1:] if content.group(2) is not None else ''
remark = content.group(2)[1:] if content.group(2) is not None else '' return {
remark = remark.replace('+', ' ') # 向后兼容部分客户端 'server': info[3],
return { 'port': int(info[4]),
'server': info[3], 'passwd': info[2],
'port': int(info[4]), 'method': info[1],
'passwd': info[2], 'remark': baseFunc.urlDecode(remark)
'method': info[1], }
'remark': baseFunc.urlDecode(remark)
}
except:
return None
def __ssCommonDecode(url: str) -> dict or None: def __ssCommonDecode(url: str) -> dict:
""" """
Shadowsocks经典分享链接解码 Shadowsocks经典分享链接解码
@ -42,23 +38,20 @@ def __ssCommonDecode(url: str) -> dict or None:
-> YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg -> YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg
-> ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server -> ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server
""" """
try: content = re.search(r'^ss://([a-zA-Z0-9_=+\\-]+)#?([\S]*)?$', url) # base64#REMARK
content = re.search(r'^ss://([a-zA-Z0-9_=+\\-]+)#?([\S]*)?$', url) # base64#REMARK info = re.search(
info = re.search( r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$',
r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', baseFunc.base64Decode(content.group(1)) # method:password@server:port
baseFunc.base64Decode(content.group(1)) # method:password@server:port )
) return {
return { 'server': info[3],
'server': info[3], 'port': int(info[4]),
'port': int(info[4]), 'passwd': info[2],
'passwd': info[2], 'method': info[1],
'method': info[1], 'remark': baseFunc.urlDecode(content.group(2))
'remark': baseFunc.urlDecode(content.group(2)) }
}
except:
return None
def __sip002Decode(url: str) -> dict or None: def __sip002Decode(url: str) -> dict:
""" """
Shadowsocks SIP002分享链接解码 Shadowsocks SIP002分享链接解码
@ -73,63 +66,58 @@ def __sip002Decode(url: str) -> dict or None:
=> ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example => ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example
""" """
try: content = re.search(
r'^ss://([a-zA-Z0-9_=+\\-]+)@([a-zA-Z0-9.:_-]+):([0-9]+)'
r'/?([\S]*)$', url # base64@server:port/... (/可选)
)
userInfo = re.search(
r'^([\S]+?):([\S]+)$',
baseFunc.base64Decode(content.group(1)) # method:password
)
info = {
'server': content.group(2),
'port': int(content.group(3)),
'passwd': userInfo.group(2),
'method': userInfo.group(1),
'remark': ''
}
if content.group(4).find('#') != -1: # ...#REMARK
content = re.search( content = re.search(
r'^ss://([a-zA-Z0-9_=+\\-]+)@([a-zA-Z0-9.:_-]+):([0-9]+)' r'^\??([\S]*)#([\S]*)$',
r'/?([\S]*)$', url # base64@server:port/... (/可选) content.group(4) # ?...#REMARK (?可选)
)
info['remark'] = baseFunc.urlDecode(
content.group(2)
) )
userInfo = re.search( else:
r'^([\S]+?):([\S]+)$', content = re.search(
baseFunc.base64Decode(content.group(1)) # method:password r'^\??([\S]*)$',
content.group(4) # ?... (?可选)
) )
info = { plugin = ''
'server': content.group(2), for field in content.group(1).split('&'): # /?plugin=...&other1=...&other2=...
'port': int(content.group(3)), if field.find('=') == -1: # 缺失xxx=...
'passwd': userInfo.group(2), continue
'method': userInfo.group(1), field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=...
'remark': '' if field.group(1) == 'plugin':
plugin = baseFunc.urlDecode(field.group(2)) # plugin参数
break
if plugin.find(';') == -1: # plugin=... (无参数)
pluginField = {
'type': plugin,
'param': ''
} }
if content.group(4).find('#') != -1: # ...#REMARK else: # plugin=...;... (带参数)
content = re.search( plugin = re.search(r'^([\S]*?);([\S]*)$', plugin) # 插件名;插件参数
r'^\??([\S]*)#([\S]*)$', pluginField = {
content.group(4) # ?...#REMARK (?可选) 'type': plugin.group(1),
) 'param': plugin.group(2)
info['remark'] = baseFunc.urlDecode( }
content.group(2) if pluginField['type'] != '': # 带插件情况
) info['plugin'] = pluginField
else: return info
content = re.search(
r'^\??([\S]*)$',
content.group(4) # ?... (?可选)
)
plugin = ''
for field in content.group(1).split('&'): # /?plugin=...&other1=...&other2=...
if field.find('=') == -1: # 缺失xxx=...
continue
field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=...
if field.group(1) == 'plugin':
plugin = baseFunc.urlDecode(field.group(2)) # plugin参数
break
if plugin.find(';') == -1: # plugin=... (无参数)
pluginField = {
'type': plugin,
'param': ''
}
else: # plugin=...;... (带参数)
plugin = re.search(r'^([\S]*?);([\S]*)$', plugin) # 插件名;插件参数
pluginField = {
'type': plugin.group(1),
'param': plugin.group(2)
}
if pluginField['type'] == '': # 无插件情况
info['plugin'] = None
else:
info['plugin'] = pluginField
return info
except:
return None
def ssDecode(url: str) -> dict or None: def ssDecode(url: str, compatible: bool = False) -> dict or None:
""" """
Shadowsocks分享链接解码 Shadowsocks分享链接解码
@ -142,17 +130,19 @@ def ssDecode(url: str) -> dict or None:
链接不合法: 链接不合法:
return None return None
""" """
if url[0:5] != 'ss://':
return None
try: try:
if url[0:5] != 'ss://':
return None
result = __ssCommonDecode(url) # try shadowsocks common decode result = __ssCommonDecode(url) # try shadowsocks common decode
if result is None:
result = __sip002Decode(url) # try shadowsocks sip002 decode
if result is None:
result = __ssPlainDecode(url) # try shadowsocks plain decode
if result is not None: # 解析成功
result['type'] = 'ss'
return result
except: except:
pass try:
return None result = __sip002Decode(url) # try shadowsocks sip002 decode
except:
try:
result = __ssPlainDecode(url) # try shadowsocks plain decode
except:
return None
if compatible and 'remark' in result: # 向后兼容部分客户端
result['remark'] = result['remark'].replace('+', ' ')
result['type'] = 'ss'
return result

73
ProxyDecoder/ShadowsocksR.py

@ -4,7 +4,7 @@
import re import re
from ProxyDecoder import baseFunc from ProxyDecoder import baseFunc
def __ssrCommonDecode(url: str) -> dict or None: def __ssrCommonDecode(url: str) -> dict:
""" """
ShadowsocksR经典分享链接解码 ShadowsocksR经典分享链接解码
@ -15,37 +15,34 @@ def __ssrCommonDecode(url: str) -> dict or None:
-> server:port:protocol:method:obfs:base64(passwd)/?... -> server:port:protocol:method:obfs:base64(passwd)/?...
-> obfsparam=...&protoparam=...&remarks=...&group=... -> obfsparam=...&protoparam=...&remarks=...&group=...
""" """
try: content = re.search(r'^ssr://([\S]+)$', url).group(1) # ssr://{base64}
content = re.search(r'^ssr://([\S]+)$', url).group(1) # ssr://{base64} content = re.search(
content = re.search( r'^([a-zA-Z0-9.:_-]*):([0-9]*):' # server:p/r
r'^([a-zA-Z0-9.:_-]*):([0-9]*):' # server:p/r r'([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):' # protocol:method:obfs:
r'([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):' # protocol:method:obfs: r'([0-9a-zA-Z_=+\\-]*)(/\?)?([\S]*)?$', # base(passwd)/?...
r'([0-9a-zA-Z_=+\\-]*)(/\?)?([\S]*)?$', # base(passwd)/?... baseFunc.base64Decode(content)
baseFunc.base64Decode(content) )
) info = {
info = { 'server': content.group(1),
'server': content.group(1), 'port': int(content.group(2)),
'port': int(content.group(2)), 'passwd': baseFunc.base64Decode(content.group(6)),
'passwd': baseFunc.base64Decode(content.group(6)), 'method': content.group(4),
'method': content.group(4), 'protocol': content.group(3),
'protocol': content.group(3), 'obfs': content.group(5),
'obfs': content.group(5), }
} for field in content.group(8).split('&'): # /?obfsparam=...&protoparam=...&remarks=...&group=...
for field in content.group(8).split('&'): # /?obfsparam=...&protoparam=...&remarks=...&group=... if field.find('=') == -1: # 缺失xxx=...
if field.find('=') == -1: # 缺失xxx=... continue
continue field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=...
field = re.search(r'^([\S]*?)=([\S]*)$', field) # xxx=... if field.group(1) == 'protoparam':
if field.group(1) == 'protoparam': info['protocolParam'] = baseFunc.base64Decode(field.group(2))
info['protocolParam'] = baseFunc.base64Decode(field.group(2)) elif field.group(1) == 'obfsparam':
elif field.group(1) == 'obfsparam': info['obfsParam'] = baseFunc.base64Decode(field.group(2))
info['obfsParam'] = baseFunc.base64Decode(field.group(2)) elif field.group(1) == 'remarks':
elif field.group(1) == 'remarks': info['remark'] = baseFunc.base64Decode(field.group(2))
info['remark'] = baseFunc.base64Decode(field.group(2)) elif field.group(1) == 'group':
elif field.group(1) == 'group': info['group'] = baseFunc.base64Decode(field.group(2))
info['group'] = baseFunc.base64Decode(field.group(2)) return info
return info
except:
return None
def ssrDecode(url: str) -> dict or None: def ssrDecode(url: str) -> dict or None:
""" """
@ -60,13 +57,11 @@ def ssrDecode(url: str) -> dict or None:
链接不合法: 链接不合法:
return None return None
""" """
if url[0:6] != 'ssr://':
return None
try: try:
if url[0:6] != 'ssr://':
return None
result = __ssrCommonDecode(url) # try common decode result = __ssrCommonDecode(url) # try common decode
if result is not None: # 解析成功
result['type'] = 'ssr'
return result
except: except:
pass return None
return None result['type'] = 'ssr'
return result

142
ProxyDecoder/VMess.py

@ -0,0 +1,142 @@
#!/usr/bin/python
# -*- coding:utf-8 -*-
import re
import json
from ProxyDecoder import baseFunc
def __splitEdParam(path: str) -> tuple[int or None, str]: # 分离early-data参数
if path.find('?') == -1:
return None, path
content = re.search(r'^([\s\S]*?)\?([\s\S]*)$', path)
ed = None
params = []
for field in content[2].split('&'): # ?param_a=...&param_b=...
if not field.startswith('ed='):
params.append(field)
continue
ed = int(field[3:]) # ed=...
if ed is None: # ed param not found
return None, path
if not params: # param -> []
return ed, content[1]
return ed, content[1] + '?' + '&'.join(params)
def __vmessV2raynDecode(url: str) -> dict:
"""
v2rayN / v2rayNG分享链接解码
FORMAT: vmess://BASE64-ENCODED-JSON-STRING
{
"v": "2",
"ps": "...",
"add": "...",
"port": "...",
"id": "...",
"aid": "...",
"scy": "...",
"net": "...",
"type": "...",
"host": "...",
"path": "...",
"tls": "...",
"sni": "...",
"alpn": "..."
}
"""
content = json.loads(
baseFunc.base64Decode(
re.search(r'^vmess://([\S]+)$', url).group(1) # vmess://{base64}
)
)
if int(content['v']) != 2: # version => 2
raise Exception('Unknown version field')
info = {
'server': content['add'],
'port': int(content['port']),
'id': content['id'],
'aid': int(content['aid']),
}
if 'ps' in content: # ps -> remark
info['remark'] = content['ps']
if 'scy' in content: # scy -> method
info['method'] = content['scy']
stream = {
'type': content['net'] # net -> stream.type
}
if content['net'] == 'tcp':
if 'http' in content and content['type'] == 'http': # type -> none / http
stream['obfs'] = {
'host': content['host'],
'path': content['path']
}
elif content['net'] == 'kcp':
if 'type' in content:
stream['obfs'] = content['type']
if 'path' in content:
stream['seed'] = content['path'] # path -> seed
elif content['net'] == 'ws':
if 'host' in content:
stream['host'] = content['host']
if 'path' in content:
try:
stream['ed'], stream['path'] = __splitEdParam(content['path'])
except:
stream['path'] = content['path']
elif content['net'] == 'h2':
if 'host' in content:
stream['host'] = content['host']
if 'path' in content:
stream['path'] = content['path']
elif content['net'] == 'quic':
if 'type' in content:
stream['obfs'] = content['type']
if 'host' in content:
stream['method'] = content['host']
if 'path' in content:
stream['passwd'] = content['path']
elif content['net'] == 'grpc':
if 'type' in content and content['type'] == 'multi':
stream['mode'] = 'multi'
if 'path' in content:
stream['service'] = content['path']
else:
raise Exception('Unknown network type')
secure = None
if 'tls' in content and content['tls'] == 'tls': # enable TLS
secure = {}
if 'sni' in content:
secure['sni'] = content['sni'] # sni option
if 'alpn' in content:
if content['alpn'] == '':
secure['alpn'] = None # ignore alpn option
else:
secure['alpn'] = content['alpn'] # h2 | http/1.1 | h2,http/1.1
stream['secure'] = secure
info['stream'] = stream
return info
def vmessDecode(url: str) -> dict or None:
"""
VMess分享链接解码
链接合法:
return {
'type': 'vmess',
...
}
链接不合法:
return None
"""
if url[0:8] != 'vmess://':
return None
try:
result = __vmessV2raynDecode(url) # try v2rayN decode
except:
return None
result['type'] = 'vmess'
return result

5
ProxyDecoder/decoder.py

@ -4,6 +4,7 @@
import re import re
from ProxyDecoder import Shadowsocks from ProxyDecoder import Shadowsocks
from ProxyDecoder import ShadowsocksR from ProxyDecoder import ShadowsocksR
from ProxyDecoder import VMess
def decode(url: str) -> dict or None: def decode(url: str) -> dict or None:
""" """
@ -22,9 +23,11 @@ def decode(url: str) -> dict or None:
try: try:
scheme = re.search(r'^([\S]+?)://([\s\S]+)$', url).group(1) scheme = re.search(r'^([\S]+?)://([\s\S]+)$', url).group(1)
if scheme == 'ss': if scheme == 'ss':
return Shadowsocks.ssDecode(url) return Shadowsocks.ssDecode(url, compatible = True)
elif scheme == 'ssr': elif scheme == 'ssr':
return ShadowsocksR.ssrDecode(url) return ShadowsocksR.ssrDecode(url)
elif scheme == 'vmess':
return VMess.vmessDecode(url)
except: except:
pass pass
return None return None

Loading…
Cancel
Save