Browse Source

feat: shadowsocks common url and sip002 decode

dev
dnomd343 2 years ago
parent
commit
90e61ceb25
  1. 21
      Basis/Functions.py
  2. 90
      Decoder/Shadowsocks.py
  3. 8
      test.py

21
Basis/Functions.py

@ -4,6 +4,7 @@
import re
import time
import uuid
import base64
import psutil
import random
import hashlib
@ -120,6 +121,26 @@ def urlDecode(content: str) -> str:
return urllib.parse.unquote(content)
def base64Encode(content: str, urlSafe: bool = False, isPadding: bool = True) -> str:
content = base64.b64encode(content.encode(encoding = 'utf-8')).decode(encoding = 'utf-8')
if urlSafe:
content = content.replace('+', '-')
content = content.replace('/', '_')
if not isPadding:
content = content.replace('=', '')
return content
def base64Decode(content: str) -> str or None:
try:
content = content.replace('-', '+').replace('_', '/')
if len(content) % 4 in range(2, 4): # remainder -> 2 or 3
content = content.ljust((len(content) // 4 + 1) * 4, '=') # increase to 4n
return base64.b64decode(content).decode(encoding = 'utf-8')
except:
raise RuntimeError('Invalid base64 encode')
def getAvailablePort(rangeStart: int = 1024, rangeEnd: int = 65535, waitTime: int = 10) -> int: # get available port
if rangeStart > rangeEnd or rangeStart < 1 or rangeEnd > 65535:
raise RuntimeError('Invalid port range')

90
Decoder/Shadowsocks.py

@ -4,37 +4,87 @@
# SIP002: https://shadowsocks.org/guide/sip002.html
# Plain / Common: https://shadowsocks.org/guide/configs.html#uri-and-qr-code
import copy
from Basis.Logger import logging
from Basis.Functions import urlDecode
from Basis.Exception import decodeException
from Basis.Functions import base64Decode, urlDecode
ssBasicConfig = {
'type': 'ss',
'info': {}
}
def ssBasicConfig() -> dict: # load shadowsocks basic config
return {
'type': 'ss',
'info': {}
}
def checkPrefix(url: str) -> str: # check url prefix and remove it
if not url.startswith('ss://'):
logging.debug('Shadowsocks url should start with `ss://`')
raise decodeException('Shadowsocks prefix error')
return url[5:]
def splitTag(url: str, fromLeft: bool = True, spaceRemark: bool = True) -> tuple[str, str]: # split tag after `#`
if '#' not in url: # without tag
return url, ''
if fromLeft:
url, remark = url.split('#', 1) # from left search
else:
url, remark = url.rsplit('#', 1) # from right search
if spaceRemark: # deal with space remark for space
remark = remark.replace('+', ' ')
return url, urlDecode(remark)
def ssPlain(url: str, spaceRemark: bool = True) -> dict:
def ssPlain(url: str) -> dict:
"""
FORMAT: ss://method:password@hostname:port#TAG
FORMAT: ss://method:password@hostname:port[#TAG]
"""
config = ssBasicConfig()
config = copy.deepcopy(ssBasicConfig)
logging.debug('Shadowsocks plain decode -> %s' % url)
if not url.startswith('ss://'):
logging.debug('Shadowsocks url should start with `ss://`')
raise decodeException('Shadowsocks prefix error')
url = url[5:] # remove `ss://`
if '#' in url:
url, remark = url.rsplit('#', 1) # split remark
if spaceRemark: # use `+` instead of space
remark = remark.replace('+', ' ')
config['name'] = urlDecode(remark)
logging.debug('Shadowsocks url remark -> %s' % config['name'])
url, config['name'] = splitTag(checkPrefix(url), False)
userinfo, url = url.rsplit('@', 1)
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
logging.debug('Shadowsocks plain release -> %s', config)
logging.debug('Shadowsocks plain decode release -> %s', config)
return config
def ssCommon(url: str) -> dict:
"""
FORMAT: ss://BASE64-ENCODED-STRING-WITHOUT-PADDING[#TAG]
base64('method:password@hostname:port')
"""
config = copy.deepcopy(ssBasicConfig)
logging.debug('Shadowsocks common decode -> %s' % url)
url, config['name'] = splitTag(checkPrefix(url))
userinfo, url = base64Decode(url).rsplit('@', 1)
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
logging.debug('Shadowsocks common decode release -> %s', config)
return config
def sip002(url: str) -> dict:
"""
FORMAT: ss://userinfo@hostname:port [ "/" ] [ "?" plugin ] [ #tag ]
userinfo => method:password or websafe-base64-encode-utf8(method:password)
"""
config = copy.deepcopy(ssBasicConfig)
logging.debug('Shadowsocks sip002 decode -> %s' % url)
url, config['name'] = splitTag(checkPrefix(url))
userinfo, url = url.rsplit('@', 1)
try:
userinfo = base64Decode(userinfo) # userinfo encode base64 is optional
except:
userinfo = urlDecode(userinfo) # not base64 decode -> url encode format
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
url = url.replace('/?plugin=', '?plugin=') # remove `/` character
if '?plugin=' in url:
url, plugin = url.split('?plugin=', 1)
plugin = urlDecode(plugin).split(';', 1)
config['info']['plugin'] = {
'type': plugin[0],
'param': '' if len(plugin) == 1 else plugin[1] # default as empty string
}
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
logging.debug('Shadowsocks sip002 decode release -> %s', config)
return config

8
test.py

@ -9,6 +9,10 @@ from pprint import pprint
from Filter import Filter
from Decoder import Shadowsocks
ret = Shadowsocks.ssPlain('ss://aes-128-ctr:password@8.210.148.24:34326')
ret = Filter(ret['type'], ret['info'])
# ret = Shadowsocks.ssPlain('ss://aes-128-ctr:password@8.210.148.24:34326#ok%2Bfuck')
# ret = Shadowsocks.ssCommon('ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server')
# 'ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server'
# ret = Shadowsocks.sip002('ss://YWVzLTEyOC1nY206dGVzdA@192.168.100.1:8888#Example1')
ret = Shadowsocks.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3')
# ret = Filter(ret['type'], ret['info'])
pprint(ret, sort_dicts = False)

Loading…
Cancel
Save