Browse Source

update: shadowsocks url decode

dev
dnomd343 2 years ago
parent
commit
31e407ba22
  1. 2
      Basis/Filter.py
  2. 4
      Basis/Functions.py
  3. 33
      Decoder/Shadowsocks.py
  4. 4
      Decoder/__init__.py
  5. 2
      Filter/__init__.py
  6. 20
      Utils/Common/Coding.py
  7. 10
      Utils/Common/Network.py
  8. 1
      Utils/Common/__init__.py
  9. 18
      test.py

2
Basis/Filter.py

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
import copy
from Basis.Exception import filterException
from Utils.Exception import filterException
filterObject = {
'optional': {

4
Basis/Functions.py

@ -7,7 +7,7 @@ import uuid
import random
import hashlib
from IPy import IP
from Basis.Logger import logging
from Utils.Logger import logger
def isIpAddr(ipAddr: str) -> bool:
@ -66,7 +66,7 @@ def genFlag(length: int = 12) -> str: # generate random task flag
flag += chr(tmp + 87) # a ~ f
else:
flag += str(tmp) # 0 ~ 9
logging.debug('Generate new flag -> ' + flag)
logger.debug('Generate new flag -> ' + flag)
return flag

33
Decoder/Shadowsocks.py

@ -6,32 +6,15 @@
import copy
from Utils.Logger import logger
from Basis.Exception import decodeException
from Utils.Common import checkScheme, splitTag
from Utils.Common import urlDecode, base64Decode
ssBasicConfig = {
'type': 'ss',
'info': {}
}
def checkPrefix(url: str) -> str: # check url prefix and remove it
if not url.startswith('ss://'):
logger.debug('Shadowsocks url should start with `ss://`')
raise decodeException('Shadowsocks prefix error')
return url[5:]
def splitTag(url: str, fromLeft: bool = True, spaceRemark: bool = True) -> tuple[str, str]: # split tag after `#`
if '#' not in url: # without tag
return url, ''
if fromLeft:
url, remark = url.split('#', 1) # from left search
else:
url, remark = url.rsplit('#', 1) # from right search
if spaceRemark: # deal with space remark for space
remark = remark.replace('+', ' ')
return url, urlDecode(remark)
def ssPlain(url: str) -> dict:
"""
@ -39,7 +22,7 @@ def ssPlain(url: str) -> dict:
"""
config = copy.deepcopy(ssBasicConfig)
logger.debug('Shadowsocks plain decode -> %s' % url)
url, config['name'] = splitTag(checkPrefix(url), False)
url, config['name'] = splitTag(checkScheme(url, 'ss', 'Shadowsocks plain'), fromRight = True)
userinfo, url = url.rsplit('@', 1)
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
@ -54,7 +37,7 @@ def ssCommon(url: str) -> dict:
"""
config = copy.deepcopy(ssBasicConfig)
logger.debug('Shadowsocks common decode -> %s' % url)
url, config['name'] = splitTag(checkPrefix(url))
url, config['name'] = splitTag(checkScheme(url, 'ss', 'Shadowsocks common'))
userinfo, url = base64Decode(url).rsplit('@', 1)
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
@ -68,8 +51,8 @@ def sip002(url: str) -> dict:
userinfo => method:password or websafe-base64-encode-utf8(method:password)
"""
config = copy.deepcopy(ssBasicConfig)
logger.debug('Shadowsocks sip002 decode -> %s' % url)
url, config['name'] = splitTag(checkPrefix(url))
logger.debug('SIP002 decode -> %s' % url)
url, config['name'] = splitTag(checkScheme(url, 'ss', 'SIP002'))
userinfo, url = url.rsplit('@', 1)
try:
userinfo = base64Decode(userinfo) # userinfo encode base64 is optional
@ -77,7 +60,7 @@ def sip002(url: str) -> dict:
userinfo = urlDecode(userinfo) # not base64 decode -> url encode format
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
url = url.replace('/?plugin=', '?plugin=') # remove `/` character
if '?plugin=' in url:
if '?plugin=' in url: # with sip003 plugin
url, plugin = url.split('?plugin=', 1)
plugin = urlDecode(plugin).split(';', 1)
config['info']['plugin'] = {
@ -85,5 +68,5 @@ def sip002(url: str) -> dict:
'param': '' if len(plugin) == 1 else plugin[1] # default as empty string
}
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
logger.debug('Shadowsocks sip002 decode release -> %s', config)
logger.debug('SIP002 decode release -> %s', config)
return config

4
Decoder/__init__.py

@ -0,0 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from Decoder.Shadowsocks import ssPlain, ssCommon, sip002

2
Filter/__init__.py

@ -9,7 +9,7 @@ from Filter.Shadowsocks import ssFilter
from Filter.ShadowsocksR import ssrFilter
from Filter.TrojanGo import trojanGoFilter
from Filter.Hysteria import hysteriaFilter
from Basis.Exception import filterException
from Utils.Exception import filterException
filterEntry = {
'ss': ssFilter,

20
Utils/Common/Coding.py

@ -3,6 +3,7 @@
import base64
import urllib.parse
from Utils.Logger import logger
def urlEncode(content: str) -> str: # url encode (RFC3986)
return urllib.parse.quote(content, encoding = 'utf-8')
@ -30,3 +31,22 @@ def base64Decode(content: str) -> str: # base64 decode
return base64.b64decode(content).decode(encoding = 'utf-8')
except:
raise RuntimeError('Invalid base64 encode')
def checkScheme(url: str, scheme: str, name: str) -> str: # check url scheme and remove it
if not url.startswith('%s://' % scheme):
logger.debug('%s url should start with `%s://`', name, scheme)
raise RuntimeError('%s scheme error', name)
return url[len(scheme) + 3:]
def splitTag(url: str, fromRight: bool = True, spaceRemark: bool = True) -> tuple[str, str]: # split tag after `#`
if '#' not in url: # without tag
return url, ''
if not fromRight:
url, remark = url.split('#', 1) # from left search
else:
url, remark = url.rsplit('#', 1) # from right search
if spaceRemark: # deal with space remark for space
remark = remark.replace('+', ' ')
return url, urlDecode(remark)

10
Utils/Common/Network.py

@ -4,7 +4,7 @@
import time
import psutil
import random
from Basis.Logger import logging
from Utils.Logger import logger
def getAvailablePort(rangeStart: int = 1024, rangeEnd: int = 65535, msWait: int = 10) -> int: # found a available port
@ -13,7 +13,7 @@ def getAvailablePort(rangeStart: int = 1024, rangeEnd: int = 65535, msWait: int
while True:
port = random.randint(rangeStart, rangeEnd) # choose randomly
if isVacantPort(port):
logging.debug('Found new available port -> %i' % port)
logger.debug('Found new available port -> %i' % port)
return port
time.sleep(msWait / 1000) # ms -> s (default 10ms)
@ -21,9 +21,9 @@ def getAvailablePort(rangeStart: int = 1024, rangeEnd: int = 65535, msWait: int
def isVacantPort(port: int) -> bool: # whether the port is occupied
for connection in networkStatus(): # scan every connections
if connection['local']['port'] == port: # port occupied (ipv4-tcp / ipv4-udp / ipv6-tcp / ipv6-udp)
logging.debug('Check port %i -> occupied' % port)
logger.debug('Check port %i -> occupied' % port)
return False
logging.debug('Check port %i -> available' % port) # vacant port
logger.debug('Check port %i -> available' % port) # vacant port
return True
@ -49,5 +49,5 @@ def networkStatus() -> list: # get all network connections
'status': connection.status,
'pid': connection.pid, # process id
})
logging.debug('Network status -> found %i connections' % len(result))
logger.debug('Network status -> found %i connections' % len(result))
return result

1
Utils/Common/__init__.py

@ -2,5 +2,6 @@
# -*- coding: utf-8 -*-
from Utils.Common.Coding import urlEncode, urlDecode
from Utils.Common.Coding import checkScheme, splitTag
from Utils.Common.Coding import base64Encode, base64Decode
from Utils.Common.Network import isVacantPort, getAvailablePort

18
test.py

@ -3,16 +3,18 @@
from Basis import Constant
Constant.LogLevel = 'DEBUG'
from Basis.Logger import logging
import Decoder
from pprint import pprint
from Filter import Filter
from Decoder import Shadowsocks
# ret = Shadowsocks.ssPlain('ss://aes-128-ctr:password@8.210.148.24:34326#ok%2Bfuck')
# ret = Shadowsocks.ssCommon('ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server')
# 'ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server'
# ret = Shadowsocks.sip002('ss://YWVzLTEyOC1nY206dGVzdA@192.168.100.1:8888#Example1')
ret = Shadowsocks.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3')
# ret = Filter(ret['type'], ret['info'])
# ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@8.210.148.24:34326#ok%2Bfuck')
# ret = Decoder.ssCommon('ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server')
# ret = Decoder.sip002('ss://YWVzLTEyOC1nY206dGVzdA@192.168.100.1:8888#Example1')
ret = Decoder.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3')
ret = {
'type': ret['type'],
'name': ret['name'],
'info': Filter(ret['type'], ret['info'])
}
pprint(ret, sort_dicts = False)

Loading…
Cancel
Save