Browse Source

feat: add ShadowsocksR url decode

dev
dnomd343 2 years ago
parent
commit
80847ff796
  1. 1
      Decoder/Shadowsocks.py
  2. 37
      Decoder/ShadowsocksR.py
  3. 1
      Decoder/__init__.py
  4. 12
      Utils/Common/Coding.py
  5. 2
      Utils/Common/__init__.py
  6. 14
      test.py

1
Decoder/Shadowsocks.py

@ -9,7 +9,6 @@ from Utils.Logger import logger
from Utils.Common import checkScheme, splitTag
from Utils.Common import urlDecode, base64Decode
ssBasicConfig = {
'type': 'ss',
'info': {}

37
Decoder/ShadowsocksR.py

@ -0,0 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# https://github.com/shadowsocksrr/shadowsocksr-csharp/blob/master/shadowsocks-csharp/Model/Server.cs#L357
from Utils.Logger import logger
from Utils.Common import checkScheme, base64Decode, splitParam
def ssr(url: str) -> dict:
"""
FORMAT: ssr://base64(
server:port:protocol:method:obfs:base64(passwd)/?
obfsparam=base64(...)&protoparam=...(...)&remarks=base64(...)&group=base64(...)
)
"""
config = {
'type': 'ssr',
'info': {}
}
info = config['info']
logger.debug('ShadowsocksR decode -> %s' % url)
params = ''
url = base64Decode(checkScheme(url, 'ssr', 'ShadowsocksR'))
if '?' in url:
url, params = url.replace('/?', '?').split('?') # `.../?...` or `...?...`
info['server'], info['port'], info['protocol'], info['method'], info['obfs'], info['passwd'] = url.rsplit(':', 5)
info['passwd'] = base64Decode(info['passwd'])
params = splitParam(params)
logger.debug('ShadowsocksR decode params -> %s' % params)
info['obfsParam'] = base64Decode(params['obfsparam']) if 'obfsparam' in params else ''
info['protocolParam'] = base64Decode(params['protoparam']) if 'protoparam' in params else ''
config['name'] = base64Decode(params['remarks']) if 'remarks' in params else ''
config['group'] = base64Decode(params['group']) if 'group' in params else ''
logger.debug('ShadowsocksR decode release -> %s', config)
return config

1
Decoder/__init__.py

@ -1,4 +1,5 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from Decoder.ShadowsocksR import ssr
from Decoder.Shadowsocks import ssPlain, ssCommon, sip002

12
Utils/Common/Coding.py

@ -35,8 +35,8 @@ def base64Decode(content: str) -> str: # base64 decode
def checkScheme(url: str, scheme: str, name: str) -> str: # check url scheme and remove it
if not url.startswith('%s://' % scheme):
logger.debug('%s url should start with `%s://`', name, scheme)
raise RuntimeError('%s scheme error', name)
logger.warning('%s url should start with `%s://`' % (name, scheme))
raise RuntimeError('%s scheme error' % name)
return url[len(scheme) + 3:]
@ -50,3 +50,11 @@ def splitTag(url: str, fromRight: bool = True, spaceRemark: bool = True) -> tupl
if spaceRemark: # deal with space remark for space
remark = remark.replace('+', ' ')
return url, urlDecode(remark)
def splitParam(params: str) -> dict: # split params
ret = {}
if params != '':
for param in params.split('&'):
ret[param.split('=', 1)[0]] = urlDecode(param.split('=', 1)[1])
return ret

2
Utils/Common/__init__.py

@ -2,6 +2,6 @@
# -*- coding: utf-8 -*-
from Utils.Common.Coding import urlEncode, urlDecode
from Utils.Common.Coding import checkScheme, splitTag
from Utils.Common.Coding import base64Encode, base64Decode
from Utils.Common.Network import isVacantPort, getAvailablePort
from Utils.Common.Coding import checkScheme, splitTag, splitParam

14
test.py

@ -11,10 +11,12 @@ from Filter import Filter
# ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@8.210.148.24:34326#ok%2Bfuck')
# ret = Decoder.ssCommon('ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server')
# ret = Decoder.sip002('ss://YWVzLTEyOC1nY206dGVzdA@192.168.100.1:8888#Example1')
ret = Decoder.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3')
ret = {
'type': ret['type'],
'name': ret['name'],
'info': Filter(ret['type'], ret['info'])
}
# ret = Decoder.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3')
# ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGsvP29iZnNwYXJhbT1ZMlUzTUdVeE5EY3dOekF1ZFhCa1lYUmxMbTFwWTNKdmMyOW1kQzVqYjIwJnByb3RvcGFyYW09TVRRM01EY3dPa0pGTTIxck9RJnJlbWFya3M9UlZoQlRWQk1SUSZncm91cD1kR1Z6ZEE')
ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGs')
# ret['info']['server'] = '[%s]' % ret['info']['server']
ret['info'] = Filter(ret['type'], ret['info'])
pprint(ret, sort_dicts = False)

Loading…
Cancel
Save