diff --git a/Basis/Functions.py b/Basis/Functions.py index c841cc0..829cf1d 100644 --- a/Basis/Functions.py +++ b/Basis/Functions.py @@ -4,6 +4,7 @@ import re import time import uuid +import base64 import psutil import random import hashlib @@ -120,6 +121,26 @@ def urlDecode(content: str) -> str: return urllib.parse.unquote(content) +def base64Encode(content: str, urlSafe: bool = False, isPadding: bool = True) -> str: + content = base64.b64encode(content.encode(encoding = 'utf-8')).decode(encoding = 'utf-8') + if urlSafe: + content = content.replace('+', '-') + content = content.replace('/', '_') + if not isPadding: + content = content.replace('=', '') + return content + + +def base64Decode(content: str) -> str or None: + try: + content = content.replace('-', '+').replace('_', '/') + if len(content) % 4 in range(2, 4): # remainder -> 2 or 3 + content = content.ljust((len(content) // 4 + 1) * 4, '=') # increase to 4n + return base64.b64decode(content).decode(encoding = 'utf-8') + except: + raise RuntimeError('Invalid base64 encode') + + def getAvailablePort(rangeStart: int = 1024, rangeEnd: int = 65535, waitTime: int = 10) -> int: # get available port if rangeStart > rangeEnd or rangeStart < 1 or rangeEnd > 65535: raise RuntimeError('Invalid port range') diff --git a/Decoder/Shadowsocks.py b/Decoder/Shadowsocks.py index 28f9d5b..a2ac210 100644 --- a/Decoder/Shadowsocks.py +++ b/Decoder/Shadowsocks.py @@ -4,37 +4,87 @@ # SIP002: https://shadowsocks.org/guide/sip002.html # Plain / Common: https://shadowsocks.org/guide/configs.html#uri-and-qr-code +import copy from Basis.Logger import logging -from Basis.Functions import urlDecode from Basis.Exception import decodeException +from Basis.Functions import base64Decode, urlDecode +ssBasicConfig = { + 'type': 'ss', + 'info': {} +} -def ssBasicConfig() -> dict: # load shadowsocks basic config - return { - 'type': 'ss', - 'info': {} - } + +def checkPrefix(url: str) -> str: # check url prefix and remove it + if not url.startswith('ss://'): + logging.debug('Shadowsocks url should start with `ss://`') + raise decodeException('Shadowsocks prefix error') + return url[5:] + + +def splitTag(url: str, fromLeft: bool = True, spaceRemark: bool = True) -> tuple[str, str]: # split tag after `#` + if '#' not in url: # without tag + return url, '' + if fromLeft: + url, remark = url.split('#', 1) # from left search + else: + url, remark = url.rsplit('#', 1) # from right search + if spaceRemark: # deal with space remark for space + remark = remark.replace('+', ' ') + return url, urlDecode(remark) -def ssPlain(url: str, spaceRemark: bool = True) -> dict: +def ssPlain(url: str) -> dict: """ - FORMAT: ss://method:password@hostname:port#TAG + FORMAT: ss://method:password@hostname:port[#TAG] """ - config = ssBasicConfig() + config = copy.deepcopy(ssBasicConfig) logging.debug('Shadowsocks plain decode -> %s' % url) - if not url.startswith('ss://'): - logging.debug('Shadowsocks url should start with `ss://`') - raise decodeException('Shadowsocks prefix error') - url = url[5:] # remove `ss://` - if '#' in url: - url, remark = url.rsplit('#', 1) # split remark - if spaceRemark: # use `+` instead of space - remark = remark.replace('+', ' ') - config['name'] = urlDecode(remark) - logging.debug('Shadowsocks url remark -> %s' % config['name']) + url, config['name'] = splitTag(checkPrefix(url), False) userinfo, url = url.rsplit('@', 1) config['info']['server'], config['info']['port'] = url.rsplit(':', 1) config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1) - logging.debug('Shadowsocks plain release -> %s', config) + logging.debug('Shadowsocks plain decode release -> %s', config) return config + +def ssCommon(url: str) -> dict: + """ + FORMAT: ss://BASE64-ENCODED-STRING-WITHOUT-PADDING[#TAG] + base64('method:password@hostname:port') + """ + config = copy.deepcopy(ssBasicConfig) + logging.debug('Shadowsocks common decode -> %s' % url) + url, config['name'] = splitTag(checkPrefix(url)) + userinfo, url = base64Decode(url).rsplit('@', 1) + config['info']['server'], config['info']['port'] = url.rsplit(':', 1) + config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1) + logging.debug('Shadowsocks common decode release -> %s', config) + return config + + +def sip002(url: str) -> dict: + """ + FORMAT: ss://userinfo@hostname:port [ "/" ] [ "?" plugin ] [ #tag ] + userinfo => method:password or websafe-base64-encode-utf8(method:password) + """ + config = copy.deepcopy(ssBasicConfig) + logging.debug('Shadowsocks sip002 decode -> %s' % url) + url, config['name'] = splitTag(checkPrefix(url)) + userinfo, url = url.rsplit('@', 1) + try: + userinfo = base64Decode(userinfo) # userinfo encode base64 is optional + except: + userinfo = urlDecode(userinfo) # not base64 decode -> url encode format + config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1) + url = url.replace('/?plugin=', '?plugin=') # remove `/` character + if '?plugin=' in url: + url, plugin = url.split('?plugin=', 1) + plugin = urlDecode(plugin).split(';', 1) + config['info']['plugin'] = { + 'type': plugin[0], + 'param': '' if len(plugin) == 1 else plugin[1] # default as empty string + } + config['info']['server'], config['info']['port'] = url.rsplit(':', 1) + logging.debug('Shadowsocks sip002 decode release -> %s', config) + return config diff --git a/test.py b/test.py index 7bae81a..5eb3f67 100755 --- a/test.py +++ b/test.py @@ -9,6 +9,10 @@ from pprint import pprint from Filter import Filter from Decoder import Shadowsocks -ret = Shadowsocks.ssPlain('ss://aes-128-ctr:password@8.210.148.24:34326') -ret = Filter(ret['type'], ret['info']) +# ret = Shadowsocks.ssPlain('ss://aes-128-ctr:password@8.210.148.24:34326#ok%2Bfuck') +# ret = Shadowsocks.ssCommon('ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server') +# 'ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server' +# ret = Shadowsocks.sip002('ss://YWVzLTEyOC1nY206dGVzdA@192.168.100.1:8888#Example1') +ret = Shadowsocks.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3') +# ret = Filter(ret['type'], ret['info']) pprint(ret, sort_dicts = False)