Browse Source

update: filter module

dev
dnomd343 2 years ago
parent
commit
03b90c2204
  1. 50
      Decoder/Shadowsocks.py
  2. 7
      Decoder/ShadowsocksR.py
  3. 4
      Filter/Brook.py
  4. 4
      Filter/Hysteria.py
  5. 4
      Filter/Shadowsocks.py
  6. 4
      Filter/ShadowsocksR.py
  7. 4
      Filter/Trojan.py
  8. 4
      Filter/TrojanGo.py
  9. 4
      Filter/VLESS.py
  10. 4
      Filter/VMess.py
  11. 1
      Utils/Common/Format.py
  12. 6
      test.py

50
Decoder/Shadowsocks.py

@ -4,28 +4,26 @@
# SIP002: https://shadowsocks.org/guide/sip002.html
# Plain / Common: https://shadowsocks.org/guide/configs.html#uri-and-qr-code
import copy
from Utils.Logger import logger
from Utils.Common import urlDecode, b64Decode
from Utils.Common import checkScheme, splitTag
ssBasicConfig = {
'type': 'ss',
'info': {}
}
def ssPlain(url: str) -> dict:
"""
FORMAT: ss://method:password@hostname:port[#TAG]
"""
config = copy.deepcopy(ssBasicConfig)
logger.debug('Shadowsocks plain decode -> %s' % url)
config = {
'type': 'ss',
'info': {}
}
info = config['info']
logger.debug('Shadowsocks plain url decode -> %s' % url)
url, config['name'] = splitTag(checkScheme(url, 'ss', 'Shadowsocks plain'), fromRight = True)
userinfo, url = url.rsplit('@', 1)
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
logger.debug('Shadowsocks plain decode release -> %s', config)
info['server'], info['port'] = url.rsplit(':', 1)
info['method'], info['passwd'] = userinfo.split(':', 1)
logger.debug('Shadowsocks plain url release -> %s' % config)
return config
@ -34,13 +32,17 @@ def ssCommon(url: str) -> dict:
FORMAT: ss://BASE64-ENCODED-STRING-WITHOUT-PADDING[#TAG]
base64('method:password@hostname:port')
"""
config = copy.deepcopy(ssBasicConfig)
logger.debug('Shadowsocks common decode -> %s' % url)
config = {
'type': 'ss',
'info': {}
}
info = config['info']
logger.debug('Shadowsocks common url decode -> %s' % url)
url, config['name'] = splitTag(checkScheme(url, 'ss', 'Shadowsocks common'))
userinfo, url = b64Decode(url).rsplit('@', 1)
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
logger.debug('Shadowsocks common decode release -> %s', config)
info['server'], info['port'] = url.rsplit(':', 1)
info['method'], info['passwd'] = userinfo.split(':', 1)
logger.debug('Shadowsocks common url release -> %s' % config)
return config
@ -49,23 +51,27 @@ def sip002(url: str) -> dict:
FORMAT: ss://userinfo@hostname:port [ "/" ] [ "?" plugin ] [ #tag ]
userinfo => method:password or websafe-base64-encode-utf8(method:password)
"""
config = copy.deepcopy(ssBasicConfig)
logger.debug('SIP002 decode -> %s' % url)
config = {
'type': 'ss',
'info': {}
}
info = config['info']
logger.debug('SIP002 url decode -> %s' % url)
url, config['name'] = splitTag(checkScheme(url, 'ss', 'SIP002'))
userinfo, url = url.rsplit('@', 1)
try:
userinfo = b64Decode(userinfo) # userinfo encode base64 is optional
except:
userinfo = urlDecode(userinfo) # not base64 decode -> url encode format
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
info['method'], info['passwd'] = userinfo.split(':', 1)
url = url.replace('/?plugin=', '?plugin=') # remove `/` character
if '?plugin=' in url: # with sip003 plugin
url, plugin = url.split('?plugin=', 1)
plugin = urlDecode(plugin).split(';', 1)
config['info']['plugin'] = {
info['plugin'] = {
'type': plugin[0],
'param': '' if len(plugin) == 1 else plugin[1] # default as empty string
}
config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
logger.debug('SIP002 decode release -> %s', config)
info['server'], info['port'] = url.rsplit(':', 1)
logger.debug('SIP002 url release -> %s' % config)
return config

7
Decoder/ShadowsocksR.py

@ -6,6 +6,7 @@
from Utils.Logger import logger
from Utils.Common import b64Decode, checkScheme, splitParam
def ssr(url: str) -> dict:
"""
FORMAT: ssr://base64(
@ -18,7 +19,7 @@ def ssr(url: str) -> dict:
'info': {}
}
info = config['info']
logger.debug('ShadowsocksR decode -> %s' % url)
logger.debug('ShadowsocksR url decode -> %s' % url)
params = ''
url = b64Decode(checkScheme(url, 'ssr', 'ShadowsocksR'))
@ -28,10 +29,10 @@ def ssr(url: str) -> dict:
info['passwd'] = b64Decode(info['passwd'])
params = splitParam(params)
logger.debug('ShadowsocksR decode params -> %s' % params)
logger.debug('ShadowsocksR url params -> %s' % params)
info['obfsParam'] = b64Decode(params['obfsparam']) if 'obfsparam' in params else ''
info['protocolParam'] = b64Decode(params['protoparam']) if 'protoparam' in params else ''
config['name'] = b64Decode(params['remarks']) if 'remarks' in params else ''
config['group'] = b64Decode(params['group']) if 'group' in params else ''
logger.debug('ShadowsocksR decode release -> %s', config)
logger.debug('ShadowsocksR url release -> %s' % config)
return config

4
Filter/Brook.py

@ -2,8 +2,8 @@
# -*- coding: utf-8 -*-
import copy
from Utils.Common import isHost, isPort
from Utils.Filter import Filter, rulesFilter
from Utils.Common import isHost, isPort, hostFormat
from Utils.Common import toInt, toStr, toStrTidy, toBool
secureObject = rulesFilter({
@ -72,7 +72,7 @@ wsObject = rulesFilter({
brookObject = rulesFilter({
'server': {
'type': str,
'format': toStrTidy,
'format': lambda s: hostFormat(toStrTidy(s)),
'filter': isHost,
'errMsg': 'Invalid server address'
},

4
Filter/Hysteria.py

@ -4,13 +4,13 @@
import copy
from Utils.Constant import hysteriaProtocols
from Utils.Filter import Filter, rulesFilter
from Utils.Common import isHost, isPort, isIpAddr
from Utils.Common import toInt, toStr, toStrTidy, toBool
from Utils.Common import isHost, isPort, isIpAddr, hostFormat
hysteriaObject = rulesFilter({
'server': {
'type': str,
'format': toStrTidy,
'format': lambda s: hostFormat(toStrTidy(s)),
'filter': isHost,
'errMsg': 'Invalid server address'
},

4
Filter/Shadowsocks.py

@ -6,12 +6,12 @@ from Filter.Plugin import pluginObject
from Utils.Constant import ssAllMethods
from Utils.Common import isHost, isPort
from Utils.Filter import Filter, rulesFilter
from Utils.Common import toInt, toStr, toStrTidy
from Utils.Common import toInt, toStr, toStrTidy, hostFormat
ssObject = rulesFilter({
'server': {
'type': str,
'format': toStrTidy,
'format': lambda s: hostFormat(toStrTidy(s)),
'filter': isHost,
'errMsg': 'Invalid server address'
},

4
Filter/ShadowsocksR.py

@ -4,7 +4,7 @@
import copy
from Utils.Common import isHost, isPort
from Utils.Filter import Filter, rulesFilter
from Utils.Common import toInt, toStr, toStrTidy
from Utils.Common import toInt, toStr, toStrTidy, hostFormat
from Utils.Constant import ssrMethods, ssrProtocols, ssrObfuscations
@ -21,7 +21,7 @@ def ssrObfsFormat(obfs: str) -> str:
ssrObject = rulesFilter({
'server': {
'type': str,
'format': toStrTidy,
'format': lambda s: hostFormat(toStrTidy(s)),
'filter': isHost,
'errMsg': 'Invalid server address'
},

4
Filter/Trojan.py

@ -5,12 +5,12 @@ import copy
from Filter import Xray
from Utils.Common import isHost, isPort
from Utils.Filter import Filter, rulesFilter
from Utils.Common import toInt, toStr, toStrTidy
from Utils.Common import toInt, toStr, toStrTidy, hostFormat
trojanObject = rulesFilter({
'server': {
'type': str,
'format': toStrTidy,
'format': lambda s: hostFormat(toStrTidy(s)),
'filter': isHost,
'errMsg': 'Invalid server address'
},

4
Filter/TrojanGo.py

@ -3,9 +3,9 @@
import copy
from Filter.Plugin import pluginObject
from Utils.Common import isHost, isPort
from Utils.Constant import trojanGoMethods
from Utils.Filter import Filter, rulesFilter
from Utils.Common import isHost, isPort, hostFormat
from Utils.Common import isIpAddr, toInt, toStr, toStrTidy, toBool
ssObject = rulesFilter({
@ -46,7 +46,7 @@ wsObject = rulesFilter({
trojanGoObject = rulesFilter({
'server': {
'type': str,
'format': toStrTidy,
'format': lambda s: hostFormat(toStrTidy(s)),
'filter': isHost,
'errMsg': 'Invalid server address'
},

4
Filter/VLESS.py

@ -4,13 +4,13 @@
import copy
from Filter import Xray
from Utils.Common import isHost, isPort
from Utils.Common import toInt, toStrTidy
from Utils.Filter import Filter, rulesFilter
from Utils.Common import toInt, toStrTidy, hostFormat
vlessObject = rulesFilter({
'server': {
'type': str,
'format': toStrTidy,
'format': lambda s: hostFormat(toStrTidy(s)),
'filter': isHost,
'errMsg': 'Invalid server address'
},

4
Filter/VMess.py

@ -5,13 +5,13 @@ import copy
from Filter import V2ray
from Utils.Constant import vmessMethods
from Utils.Common import isHost, isPort
from Utils.Common import toInt, toStrTidy
from Utils.Filter import Filter, rulesFilter
from Utils.Common import toInt, toStrTidy, hostFormat
vmessObject = rulesFilter({
'server': {
'type': str,
'format': toStrTidy,
'format': lambda s: hostFormat(toStrTidy(s)),
'filter': isHost,
'errMsg': 'Invalid server address'
},

1
Utils/Common/Format.py

@ -11,6 +11,7 @@ def v6AddBracket(host: str) -> str: # add bracket for ipv6
def hostFormat(host: str, v6Bracket: bool = False) -> str:
host = host.strip().lower()
try:
if host[:1] == '[' and host[-1:] == ']': # [IPv6] format
host = host[1:-1] # remove extra bracket

6
test.py

@ -8,7 +8,9 @@ import Decoder
from pprint import pprint
from Filter import Filter
ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@8.210.148.24:34326#ok%2Bfuck')
# ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@8.210.148.24:34326#ok%2Bfuck')
# ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@fc00::1:34326#ok%2Bfuck')
# ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@[fc00::1]:34326#ok%2Bfuck')
# ret = Decoder.ssCommon('ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server')
# ret = Decoder.sip002('ss://YWVzLTEyOC1nY206dGVzdA@192.168.100.1:8888#Example1')
# ret = Decoder.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3')
@ -17,7 +19,7 @@ ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@8.210.148.24:34326#ok%2Bfuck'
# ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGsvP3Byb3RvcGFyYW09')
# ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGs')
# ret = Decoder.v2rayN('vmess://eyJhZGQiOiJmbnlkdXpheW92dnNxaTMyZ2kucTc1NDMudG9wIiwicHMiOiJ2MXzpppnmuK8wM3zljp_nlJ984piF4piF4piFICgyKSIsInNjeSI6ImF1dG8iLCJ0eXBlIjoiaHR0cCIsInNuaSI6IiIsInBhdGgiOiIvIiwicG9ydCI6MzgzMzcsInYiOjIsImhvc3QiOiJ4aGY0eHE3ZDVjYjRnc3kzeW1teWR3b2kuc2luYS5jbiIsInRscyI6IiIsImlkIjoiOTAxY2UyNTUtOTM2OS1kMWUyLTk1ODQtZGE1YTdqZjA1NDdrIiwibmV0IjoidGNwIiwiYWlkIjowfQ')
ret = Decoder.v2rayN('vmess://eyJhZGQiOiJmbnlkdXpheW92dnNxaTMyZ2kucTc1NDMudG9wIiwicHMiOiJ2MXzpppnmuK8wM3zljp_nlJ984piF4piF4piFICgyKSIsInNjeSI6ImF1dG8iLCJ0eXBlIjoiaHR0cCIsInNuaSI6IiIsInBhdGgiOiIvIiwicG9ydCI6MzgzMzcsInYiOjIsImhvc3QiOiJ4aGY0eHE3ZDVjYjRnc3kzeW1teWR3b2kuc2luYS5jbiIsInRscyI6IiIsImlkIjoiOTAxY2UyNTUtOTM2OS1kMWUyLTk1ODQtZGE1YTdqZjA1NDdrIiwibmV0IjoidGNwIiwiYWlkIjowfQ')
# ret['info']['server'] = '[%s]' % ret['info']['server']

Loading…
Cancel
Save