Browse Source

refactor: new ProxyDecoder

master
Dnomd343 3 years ago
parent
commit
0ab438485f
  1. 108
      ProxyDecoder/Shadowsocks.py
  2. 29
      ProxyDecoder/ShadowsocksR.py
  3. 2
      ProxyDecoder/__init__.py
  4. 14
      ProxyDecoder/baseFunc.py
  5. 7
      ProxyDecoder/decoder.py

108
ProxyDecoder/Shadowsocks.py

@ -4,32 +4,35 @@
import re import re
from ProxyDecoder import baseFunc from ProxyDecoder import baseFunc
def __ssPlainDecode(url: str): def __ssPlainDecode(url: str) -> dict or None:
''' """
Shadowsocks原始分享链接解码 Shadowsocks原始分享链接解码
FORMAT: ss://method:password@hostname:port#TAG FORMAT: ss://method:password@hostname:port#TAG
EXAMPLE: EXAMPLE:
ss://bf-cfb:test@192.168.100.1:8888#EXAMPLE ss://bf-cfb:test@192.168.100.1:8888#EXAMPLE
''' """
content = re.search(r'^ss://([\s\S]+?)(#[\s\S]*)?$', url) # ...#REMARK try:
info = re.search( content = re.search(r'^ss://([\s\S]+?)(#[\s\S]*)?$', url) # ...#REMARK
r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', info = re.search(
content.group(1) # method:password@server:port r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$',
) content.group(1) # method:password@server:port
remark = content.group(2)[1:] if content.group(2) != None else '' )
remark = remark.replace('+', ' ') # 向后兼容部分客户端 remark = content.group(2)[1:] if content.group(2) is not None else ''
return { remark = remark.replace('+', ' ') # 向后兼容部分客户端
'server': info[3], return {
'port': int(info[4]), 'server': info[3],
'password': info[2], 'port': int(info[4]),
'method': info[1], 'passwd': info[2],
'remark': baseFunc.urlDecode(remark) 'method': info[1],
} 'remark': baseFunc.urlDecode(remark)
}
except:
return None
def __ssCommonDecode(url: str): def __ssCommonDecode(url: str) -> dict or None:
''' """
Shadowsocks经典分享链接解码 Shadowsocks经典分享链接解码
FORMAT: ss://BASE64-ENCODED-STRING-WITHOUT-PADDING#TAG FORMAT: ss://BASE64-ENCODED-STRING-WITHOUT-PADDING#TAG
@ -38,9 +41,9 @@ def __ssCommonDecode(url: str):
base64('bf-cfb:test/!@#:@192.168.100.1:8888') base64('bf-cfb:test/!@#:@192.168.100.1:8888')
-> YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg -> YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg
-> ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server -> ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server
''' """
content = re.search(r'^ss://([a-zA-Z0-9_=+\\-]+)#?([\S]*)?$', url) # base64#REMARK
try: try:
content = re.search(r'^ss://([a-zA-Z0-9_=+\\-]+)#?([\S]*)?$', url) # base64#REMARK
info = re.search( info = re.search(
r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$', r'^([\S]+?):([\S]+)@([a-zA-Z0-9.:_-]+):([0-9]+)$',
baseFunc.base64Decode(content.group(1)) # method:password@server:port baseFunc.base64Decode(content.group(1)) # method:password@server:port
@ -48,15 +51,15 @@ def __ssCommonDecode(url: str):
return { return {
'server': info[3], 'server': info[3],
'port': int(info[4]), 'port': int(info[4]),
'password': info[2], 'passwd': info[2],
'method': info[1], 'method': info[1],
'remark': baseFunc.urlDecode(content.group(2)) 'remark': baseFunc.urlDecode(content.group(2))
} }
except: except:
return None return None
def __sip002Decode(url: str): def __sip002Decode(url: str) -> dict or None:
''' """
Shadowsocks SIP002分享链接解码 Shadowsocks SIP002分享链接解码
FORMAT: FORMAT:
@ -69,12 +72,12 @@ def __sip002Decode(url: str):
obfs-local;obfs=http -> obfs-local%3Bobfs%3Dhttp obfs-local;obfs=http -> obfs-local%3Bobfs%3Dhttp
=> ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example => ss://cmM0LW1kNTpwYXNzd2Q@192.168.100.1:8888/?plugin=obfs-local%3Bobfs%3Dhttp#Example
''' """
content = re.search(
r'^ss://([a-zA-Z0-9_=+\\-]+)@([a-zA-Z0-9.:_-]+):([0-9]+)'
r'/?([\S]*)$', url # base64@server:port/... (/可选)
)
try: try:
content = re.search(
r'^ss://([a-zA-Z0-9_=+\\-]+)@([a-zA-Z0-9.:_-]+):([0-9]+)'
r'/?([\S]*)$', url # base64@server:port/... (/可选)
)
userInfo = re.search( userInfo = re.search(
r'^([\S]+?):([\S]+)$', r'^([\S]+?):([\S]+)$',
baseFunc.base64Decode(content.group(1)) # method:password baseFunc.base64Decode(content.group(1)) # method:password
@ -82,20 +85,24 @@ def __sip002Decode(url: str):
info = { info = {
'server': content.group(2), 'server': content.group(2),
'port': int(content.group(3)), 'port': int(content.group(3)),
'password': userInfo.group(2), 'passwd': userInfo.group(2),
'method': userInfo.group(1) 'method': userInfo.group(1),
'remark': ''
} }
if content.group(4).find('#') != -1: # ...#REMARK if content.group(4).find('#') != -1: # ...#REMARK
content = re.search( content = re.search(
r'^\??([\S]*)#([\S]*)$', r'^\??([\S]*)#([\S]*)$',
content.group(4) # ?...#REMARK (?可选) content.group(4) # ?...#REMARK (?可选)
) )
remark = content.group(2) info['remark'] = baseFunc.urlDecode(
content.group(2)
)
else: else:
content = re.search( content = re.search(
r'^\??([\S]*)$', r'^\??([\S]*)$',
content.group(4) # ?... (?可选) content.group(4) # ?... (?可选)
) )
plugin = ''
for field in content.group(1).split('&'): # /?plugin=...&other1=...&other2=... for field in content.group(1).split('&'): # /?plugin=...&other1=...&other2=...
if field.find('=') == -1: # 缺失xxx=... if field.find('=') == -1: # 缺失xxx=...
continue continue
@ -103,44 +110,45 @@ def __sip002Decode(url: str):
if field.group(1) == 'plugin': if field.group(1) == 'plugin':
plugin = baseFunc.urlDecode(field.group(2)) # plugin参数 plugin = baseFunc.urlDecode(field.group(2)) # plugin参数
break break
plugin = plugin if 'plugin' in dir() else '' # 无plugin时为空
if plugin.find(';') == -1: # plugin=... (无参数) if plugin.find(';') == -1: # plugin=... (无参数)
info['plugin'] = plugin info['plugin'] = {
info['pluginParam'] = '' 'type': plugin,
'param': ''
}
else: # plugin=...;... (带参数) else: # plugin=...;... (带参数)
plugin = re.search(r'^([\S]*?);([\S]*)$', plugin) # 插件名;插件参数 plugin = re.search(r'^([\S]*?);([\S]*)$', plugin) # 插件名;插件参数
info['plugin'] = plugin.group(1) info['plugin'] = {
info['pluginParam'] = plugin.group(2) 'type': plugin.group(1),
info['remark'] = baseFunc.urlDecode( 'param': plugin.group(2)
remark if 'remark' in dir() else '' # 无remark时为空 }
)
return info return info
except: except:
return None return None
def ssDecode(url: str): def ssDecode(url: str) -> dict or None:
''' """
Shadowsocks 分享链接解码 Shadowsocks分享链接解码
链接合法: 链接合法:
return {...} return {
'type': 'ss',
...
}
链接不合法: 链接不合法:
return None return None
''' """
try: try:
if url[0:5] != 'ss://': if url[0:5] != 'ss://':
return None return None
result = __ssCommonDecode(url) # try shadowsocks common decode result = __ssCommonDecode(url) # try shadowsocks common decode
if result == None: if result is None:
result = __sip002Decode(url) # try shadowsocks sip002 decode result = __sip002Decode(url) # try shadowsocks sip002 decode
if result == None: if result is None:
result = __ssPlainDecode(url) # try shadowsocks plain decode result = __ssPlainDecode(url) # try shadowsocks plain decode
if result != None: # 解析成功 if result is not None: # 解析成功
result['type'] = 'ss' result['type'] = 'ss'
return result return result
else: # 解析失败
return None
except: except:
pass pass
return None return None

29
ProxyDecoder/ShadowsocksR.py

@ -4,8 +4,8 @@
import re import re
from ProxyDecoder import baseFunc from ProxyDecoder import baseFunc
def __ssrCommonDecode(url: str): def __ssrCommonDecode(url: str) -> dict or None:
''' """
ShadowsocksR经典分享链接解码 ShadowsocksR经典分享链接解码
FORMAT: ssr://BASE64-ENCODED-STRING-WITHOUT-PADDING FORMAT: ssr://BASE64-ENCODED-STRING-WITHOUT-PADDING
@ -14,19 +14,19 @@ def __ssrCommonDecode(url: str):
ssr://{base64} ssr://{base64}
-> server:port:protocol:method:obfs:base64(passwd)/?... -> server:port:protocol:method:obfs:base64(passwd)/?...
-> obfsparam=...&protoparam=...&remarks=...&group=... -> obfsparam=...&protoparam=...&remarks=...&group=...
''' """
try: try:
content = re.search(r'^ssr://([\S]+)$', url).group(1) # ssr://{base64} content = re.search(r'^ssr://([\S]+)$', url).group(1) # ssr://{base64}
content = re.search( content = re.search(
r'^([a-zA-Z0-9.:_-]*):([0-9]*):' # server:p/r r'^([a-zA-Z0-9.:_-]*):([0-9]*):' # server:p/r
r'([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):' # protocol:method:obfs: r'([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):([0-9a-zA-Z_.-]*):' # protocol:method:obfs:
r'([0-9a-zA-Z_=+\\-]*)(/\?)?([\S]*)?$' # base(passwd)/?... r'([0-9a-zA-Z_=+\\-]*)(/\?)?([\S]*)?$', # base(passwd)/?...
, baseFunc.base64Decode(content) baseFunc.base64Decode(content)
) )
info = { info = {
'server': content.group(1), 'server': content.group(1),
'port': int(content.group(2)), 'port': int(content.group(2)),
'password': baseFunc.base64Decode(content.group(6)), 'passwd': baseFunc.base64Decode(content.group(6)),
'method': content.group(4), 'method': content.group(4),
'protocol': content.group(3), 'protocol': content.group(3),
'obfs': content.group(5), 'obfs': content.group(5),
@ -47,25 +47,26 @@ def __ssrCommonDecode(url: str):
except: except:
return None return None
def ssrDecode(url: str): def ssrDecode(url: str) -> dict or None:
''' """
ShadowsocksR 分享链接解码 ShadowsocksR分享链接解码
链接合法: 链接合法:
return {...} return {
'type': 'ssr',
...
}
链接不合法: 链接不合法:
return None return None
''' """
try: try:
if url[0:6] != 'ssr://': if url[0:6] != 'ssr://':
return None return None
result = __ssrCommonDecode(url) # try common decode result = __ssrCommonDecode(url) # try common decode
if result != None: # 解析成功 if result is not None: # 解析成功
result['type'] = 'ssr' result['type'] = 'ssr'
return result return result
else: # 解析失败
return None
except: except:
pass pass
return None return None

2
ProxyDecoder/__init__.py

@ -3,4 +3,4 @@
from ProxyDecoder.decoder import * from ProxyDecoder.decoder import *
__all__ = [ 'decode' ] __all__ = ['decode']

14
ProxyDecoder/baseFunc.py

@ -4,19 +4,19 @@
import base64 import base64
import urllib.parse import urllib.parse
def urlEncode(content: str): def urlEncode(content: str) -> str or None:
try: try:
return urllib.parse.urlencode(content) return urllib.parse.urlencode(content)
except: except:
return None return None
def urlDecode(content: str): def urlDecode(content: str) -> str or None:
try: try:
return urllib.parse.unquote(content) return urllib.parse.unquote(content)
except: except:
return None return None
def base64Encode(content: str, urlSafe: bool = False, isPadding: bool = True): def base64Encode(content: str, urlSafe: bool = False, isPadding: bool = True) -> str or None:
try: try:
content = base64.b64encode(content.encode()).decode() content = base64.b64encode(content.encode()).decode()
if urlSafe: if urlSafe:
@ -28,11 +28,11 @@ def base64Encode(content: str, urlSafe: bool = False, isPadding: bool = True):
except: except:
return None return None
def base64Decode(content: str): def base64Decode(content: str) -> str or None:
content = content.replace('-', '+').replace('_', '/')
if len(content) % 4 in range(2, 4): # remainder -> 2 or 3
content = content.ljust((len(content) // 4 + 1) * 4, '=') # increase to 4n
try: try:
content = content.replace('-', '+').replace('_', '/')
if len(content) % 4 in range(2, 4): # remainder -> 2 or 3
content = content.ljust((len(content) // 4 + 1) * 4, '=') # increase to 4n
return base64.b64decode(content).decode() return base64.b64decode(content).decode()
except: except:
return None return None

7
ProxyDecoder/decoder.py

@ -5,8 +5,8 @@ import re
from ProxyDecoder import Shadowsocks from ProxyDecoder import Shadowsocks
from ProxyDecoder import ShadowsocksR from ProxyDecoder import ShadowsocksR
def decode(url): def decode(url: str) -> dict or None:
''' """
代理分享链接解码 代理分享链接解码
链接无效: 链接无效:
@ -18,8 +18,7 @@ def decode(url):
'...': '...', '...': '...',
... ...
} }
"""
'''
try: try:
scheme = re.search(r'^([\S]+?)://([\s\S]+)$', url).group(1) scheme = re.search(r'^([\S]+?)://([\s\S]+)$', url).group(1)
if scheme == 'ss': if scheme == 'ss':

Loading…
Cancel
Save