Browse Source

update: filter utils

dev
dnomd343 2 years ago
parent
commit
59849533f6
  1. 6
      Decoder/Shadowsocks.py
  2. 74
      Decoder/ShadowsocksR.py
  3. 4
      Decoder/VMess.py
  4. 6
      Filter/Brook.py
  5. 8
      Filter/Hysteria.py
  6. 6
      Filter/Plugin.py
  7. 8
      Filter/Shadowsocks.py
  8. 8
      Filter/ShadowsocksR.py
  9. 6
      Filter/Trojan.py
  10. 8
      Filter/TrojanGo.py
  11. 6
      Filter/V2ray.py
  12. 6
      Filter/VLESS.py
  13. 8
      Filter/VMess.py
  14. 6
      Filter/Xray.py
  15. 3
      Utils/Constant/__init__.py
  16. 10
      Utils/Filter.py
  17. 6
      test.py

6
Decoder/Shadowsocks.py

@ -6,8 +6,8 @@
import copy import copy
from Utils.Logger import logger from Utils.Logger import logger
from Utils.Common import urlDecode, b64Decode
from Utils.Common import checkScheme, splitTag from Utils.Common import checkScheme, splitTag
from Utils.Common import urlDecode, base64Decode
ssBasicConfig = { ssBasicConfig = {
'type': 'ss', 'type': 'ss',
@ -37,7 +37,7 @@ def ssCommon(url: str) -> dict:
config = copy.deepcopy(ssBasicConfig) config = copy.deepcopy(ssBasicConfig)
logger.debug('Shadowsocks common decode -> %s' % url) logger.debug('Shadowsocks common decode -> %s' % url)
url, config['name'] = splitTag(checkScheme(url, 'ss', 'Shadowsocks common')) url, config['name'] = splitTag(checkScheme(url, 'ss', 'Shadowsocks common'))
userinfo, url = base64Decode(url).rsplit('@', 1) userinfo, url = b64Decode(url).rsplit('@', 1)
config['info']['server'], config['info']['port'] = url.rsplit(':', 1) config['info']['server'], config['info']['port'] = url.rsplit(':', 1)
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1) config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)
logger.debug('Shadowsocks common decode release -> %s', config) logger.debug('Shadowsocks common decode release -> %s', config)
@ -54,7 +54,7 @@ def sip002(url: str) -> dict:
url, config['name'] = splitTag(checkScheme(url, 'ss', 'SIP002')) url, config['name'] = splitTag(checkScheme(url, 'ss', 'SIP002'))
userinfo, url = url.rsplit('@', 1) userinfo, url = url.rsplit('@', 1)
try: try:
userinfo = base64Decode(userinfo) # userinfo encode base64 is optional userinfo = b64Decode(userinfo) # userinfo encode base64 is optional
except: except:
userinfo = urlDecode(userinfo) # not base64 decode -> url encode format userinfo = urlDecode(userinfo) # not base64 decode -> url encode format
config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1) config['info']['method'], config['info']['passwd'] = userinfo.split(':', 1)

74
Decoder/ShadowsocksR.py

@ -1,37 +1,37 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# https://github.com/shadowsocksrr/shadowsocksr-csharp/blob/master/shadowsocks-csharp/Model/Server.cs#L357 # https://github.com/shadowsocksrr/shadowsocksr-csharp/blob/master/shadowsocks-csharp/Model/Server.cs#L357
from Utils.Logger import logger from Utils.Logger import logger
from Utils.Common import checkScheme, base64Decode, splitParam from Utils.Common import b64Decode, checkScheme, splitParam
def ssr(url: str) -> dict: def ssr(url: str) -> dict:
""" """
FORMAT: ssr://base64( FORMAT: ssr://base64(
server:port:protocol:method:obfs:base64(passwd)/? server:port:protocol:method:obfs:base64(passwd)/?
obfsparam=base64(...)&protoparam=...(...)&remarks=base64(...)&group=base64(...) obfsparam=base64(...)&protoparam=...(...)&remarks=base64(...)&group=base64(...)
) )
""" """
config = { config = {
'type': 'ssr', 'type': 'ssr',
'info': {} 'info': {}
} }
info = config['info'] info = config['info']
logger.debug('ShadowsocksR decode -> %s' % url) logger.debug('ShadowsocksR decode -> %s' % url)
params = '' params = ''
url = base64Decode(checkScheme(url, 'ssr', 'ShadowsocksR')) url = b64Decode(checkScheme(url, 'ssr', 'ShadowsocksR'))
if '?' in url: if '?' in url:
url, params = url.replace('/?', '?').split('?') # `.../?...` or `...?...` url, params = url.replace('/?', '?').split('?') # `.../?...` or `...?...`
info['server'], info['port'], info['protocol'], info['method'], info['obfs'], info['passwd'] = url.rsplit(':', 5) info['server'], info['port'], info['protocol'], info['method'], info['obfs'], info['passwd'] = url.rsplit(':', 5)
info['passwd'] = base64Decode(info['passwd']) info['passwd'] = b64Decode(info['passwd'])
params = splitParam(params) params = splitParam(params)
logger.debug('ShadowsocksR decode params -> %s' % params) logger.debug('ShadowsocksR decode params -> %s' % params)
info['obfsParam'] = base64Decode(params['obfsparam']) if 'obfsparam' in params else '' info['obfsParam'] = b64Decode(params['obfsparam']) if 'obfsparam' in params else ''
info['protocolParam'] = base64Decode(params['protoparam']) if 'protoparam' in params else '' info['protocolParam'] = b64Decode(params['protoparam']) if 'protoparam' in params else ''
config['name'] = base64Decode(params['remarks']) if 'remarks' in params else '' config['name'] = b64Decode(params['remarks']) if 'remarks' in params else ''
config['group'] = base64Decode(params['group']) if 'group' in params else '' config['group'] = b64Decode(params['group']) if 'group' in params else ''
logger.debug('ShadowsocksR decode release -> %s', config) logger.debug('ShadowsocksR decode release -> %s', config)
return config return config

4
Decoder/VMess.py

@ -4,7 +4,7 @@
import json import json
from Utils.Logger import logger from Utils.Logger import logger
from Utils.Exception import decodeException from Utils.Exception import decodeException
from Utils.Common import base64Decode, checkScheme, hostFormat from Utils.Common import b64Decode, checkScheme, hostFormat
def v2rayN(url: str) -> dict: def v2rayN(url: str) -> dict:
""" """
@ -17,7 +17,7 @@ def v2rayN(url: str) -> dict:
} }
info = config['info'] info = config['info']
logger.debug('V2rayN url decode -> %s' % url) logger.debug('V2rayN url decode -> %s' % url)
url = json.loads(base64Decode(checkScheme(url, 'vmess', 'V2rayN'))) url = json.loads(b64Decode(checkScheme(url, 'vmess', 'V2rayN')))
logger.debug('V2rayN json format -> %s' % url) logger.debug('V2rayN json format -> %s' % url)
if int(url['v']) != 2: if int(url['v']) != 2:
logger.warning('V2rayN url with unknown version') logger.warning('V2rayN url with unknown version')

6
Filter/Brook.py

@ -2,9 +2,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import copy import copy
from Basis.Functions import isHost, isPort from Utils.Common import isHost, isPort
from Basis.Filter import Filter, rulesFilter from Utils.Filter import Filter, rulesFilter
from Basis.Functions import toInt, toStr, toStrTidy, toBool from Utils.Common import toInt, toStr, toStrTidy, toBool
secureObject = rulesFilter({ secureObject = rulesFilter({
'verify': { 'verify': {

8
Filter/Hysteria.py

@ -2,10 +2,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import copy import copy
from Basis.Functions import isHost, isPort from Utils.Constant import hysteriaProtocols
from Basis.Filter import Filter, rulesFilter from Utils.Filter import Filter, rulesFilter
from Basis.Constant import hysteriaProtocols from Utils.Common import isHost, isPort, isIpAddr
from Basis.Functions import isIpAddr, toInt, toStr, toStrTidy, toBool from Utils.Common import toInt, toStr, toStrTidy, toBool
hysteriaObject = rulesFilter({ hysteriaObject = rulesFilter({
'server': { 'server': {

6
Filter/Plugin.py

@ -2,9 +2,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import copy import copy
from Basis.Filter import rulesFilter from Utils.Filter import rulesFilter
from Basis.Constant import pluginClients from Utils.Constant import pluginClients
from Basis.Functions import toStr, toStrTidy from Utils.Common import toStr, toStrTidy
pluginAlias = { pluginAlias = {
'obfs-local': {'obfs', 'simple-obfs'}, 'obfs-local': {'obfs', 'simple-obfs'},

8
Filter/Shadowsocks.py

@ -3,10 +3,10 @@
import copy import copy
from Filter.Plugin import pluginObject from Filter.Plugin import pluginObject
from Basis.Constant import ssAllMethods from Utils.Constant import ssAllMethods
from Basis.Functions import isHost, isPort from Utils.Common import isHost, isPort
from Basis.Filter import Filter, rulesFilter from Utils.Filter import Filter, rulesFilter
from Basis.Functions import toInt, toStr, toStrTidy from Utils.Common import toInt, toStr, toStrTidy
ssObject = rulesFilter({ ssObject = rulesFilter({
'server': { 'server': {

8
Filter/ShadowsocksR.py

@ -2,10 +2,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import copy import copy
from Basis.Functions import isHost, isPort from Utils.Common import isHost, isPort
from Basis.Filter import Filter, rulesFilter from Utils.Filter import Filter, rulesFilter
from Basis.Functions import toInt, toStr, toStrTidy from Utils.Common import toInt, toStr, toStrTidy
from Basis.Constant import ssrMethods, ssrProtocols, ssrObfuscations from Utils.Constant import ssrMethods, ssrProtocols, ssrObfuscations
def ssrProtocolFormat(protocol: str) -> str: def ssrProtocolFormat(protocol: str) -> str:

6
Filter/Trojan.py

@ -3,9 +3,9 @@
import copy import copy
from Filter import Xray from Filter import Xray
from Basis.Functions import isHost, isPort from Utils.Common import isHost, isPort
from Basis.Filter import Filter, rulesFilter from Utils.Filter import Filter, rulesFilter
from Basis.Functions import toInt, toStr, toStrTidy from Utils.Common import toInt, toStr, toStrTidy
trojanObject = rulesFilter({ trojanObject = rulesFilter({
'server': { 'server': {

8
Filter/TrojanGo.py

@ -3,10 +3,10 @@
import copy import copy
from Filter.Plugin import pluginObject from Filter.Plugin import pluginObject
from Basis.Constant import trojanGoMethods from Utils.Common import isHost, isPort
from Basis.Functions import isHost, isPort from Utils.Constant import trojanGoMethods
from Basis.Filter import Filter, rulesFilter from Utils.Filter import Filter, rulesFilter
from Basis.Functions import isIpAddr, toInt, toStr, toStrTidy, toBool from Utils.Common import isIpAddr, toInt, toStr, toStrTidy, toBool
ssObject = rulesFilter({ ssObject = rulesFilter({
'method': { 'method': {

6
Filter/V2ray.py

@ -1,9 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from Basis.Filter import rulesFilter from Utils.Filter import rulesFilter
from Basis.Constant import quicMethods, udpObfuscations from Utils.Constant import quicMethods, udpObfuscations
from Basis.Functions import isIpAddr, toInt, toStr, toStrTidy, toBool from Utils.Common import isIpAddr, toInt, toStr, toStrTidy, toBool
tlsObject = rulesFilter({ tlsObject = rulesFilter({
'sni': { 'sni': {

6
Filter/VLESS.py

@ -3,9 +3,9 @@
import copy import copy
from Filter import Xray from Filter import Xray
from Basis.Functions import isHost, isPort from Utils.Common import isHost, isPort
from Basis.Filter import Filter, rulesFilter from Utils.Common import toInt, toStrTidy
from Basis.Functions import toInt, toStrTidy from Utils.Filter import Filter, rulesFilter
vlessObject = rulesFilter({ vlessObject = rulesFilter({
'server': { 'server': {

8
Filter/VMess.py

@ -3,10 +3,10 @@
import copy import copy
from Filter import V2ray from Filter import V2ray
from Basis.Constant import vmessMethods from Utils.Constant import vmessMethods
from Basis.Functions import isHost, isPort from Utils.Common import isHost, isPort
from Basis.Filter import Filter, rulesFilter from Utils.Common import toInt, toStrTidy
from Basis.Functions import toInt, toStrTidy from Utils.Filter import Filter, rulesFilter
vmessObject = rulesFilter({ vmessObject = rulesFilter({
'server': { 'server': {

6
Filter/Xray.py

@ -3,9 +3,9 @@
import copy import copy
from Filter import V2ray from Filter import V2ray
from Basis.Filter import rulesFilter from Utils.Filter import rulesFilter
from Basis.Constant import xtlsFlows from Utils.Constant import xtlsFlows
from Basis.Functions import toStrTidy, toBool from Utils.Common import toStrTidy, toBool
def xtlsFlowFormat(flow: str) -> str: def xtlsFlowFormat(flow: str) -> str:

3
Utils/Constant/__init__.py

@ -7,8 +7,7 @@ from Utils.Constant.Others import *
from Utils.Constant.Default import * from Utils.Constant.Default import *
from Utils.Constant.Shadowsocks import * from Utils.Constant.Shadowsocks import *
# Create WorkDir
try: try:
os.makedirs(WorkDir) # just like `mkdir -p ...` os.makedirs(WorkDir) # create work folder (`mkdir -p ...`)
except: except:
pass # folder exist or target is another thing pass # folder exist or target is another thing

10
Basis/Filter.py → Utils/Filter.py

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import copy import copy
from Utils.Logger import logger
from Utils.Exception import filterException from Utils.Exception import filterException
filterObject = { filterObject = {
@ -72,6 +73,7 @@ def Filter(raw: dict, rules: dict) -> dict:
data = {} data = {}
raw = copy.deepcopy(raw) raw = copy.deepcopy(raw)
rules = copy.deepcopy(rules) rules = copy.deepcopy(rules)
for key, rule in rules.items(): for key, rule in rules.items():
# pretreatment process (raw --[copy / default value]--> data) # pretreatment process (raw --[copy / default value]--> data)
if key not in raw: # key not exist if key not in raw: # key not exist
@ -80,19 +82,19 @@ def Filter(raw: dict, rules: dict) -> dict:
data[key] = rule['default'] # set default value data[key] = rule['default'] # set default value
else: # key exist else: # key exist
data[key] = raw[key] data[key] = raw[key]
# format process (data --[format]--> data) # format process (data --[format]--> data)
# TODO: some 'none' value should be format as None
if data[key] is None: # key content is None if data[key] is None: # key content is None
if not rule['allowNone']: # key is not allow None if not rule['allowNone']: # key is not allow None
raise filterException('Field `%s` shouldn\'t be None' % key) raise filterException('Field `%s` shouldn\'t be None' % key)
continue # skip following process continue # skip following process
try: try:
data[key] = rule['format'](data[key]) # run format data[key] = rule['format'](data[key]) # run format function
# TODO: format result maybe None, check allowNone again (allow -> skip following process)
except: except:
raise filterException(rule['errMsg']) # format error raise filterException(rule['errMsg']) # format error
# filter process (data --[type check (& filter check)]--> pass / non-pass) # filter process (data --[type check (& filter check)]--> pass / non-pass)
if type(rule['type']) == type: # str / int / bool / ... if type(rule['type']) == type: # str / int / bool / ... -> list
rule['type'] = [rule['type']] # str -> [str] / int -> [int] / ... rule['type'] = [rule['type']] # str -> [str] / int -> [int] / ...
if type(rule['type']) == list: # [str, int, bool, ...] if type(rule['type']) == list: # [str, int, bool, ...]
if data[key] == any and any in rule['type']: # special case -> skip type filter if data[key] == any and any in rule['type']: # special case -> skip type filter

6
test.py

@ -1,14 +1,14 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from Basis import Constant from Utils import Constant
Constant.LogLevel = 'DEBUG' Constant.LogLevel = 'DEBUG'
import Decoder import Decoder
from pprint import pprint from pprint import pprint
from Filter import Filter from Filter import Filter
# ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@8.210.148.24:34326#ok%2Bfuck') ret = Decoder.ssPlain('ss://aes-128-ctr:pa@ss#word@8.210.148.24:34326#ok%2Bfuck')
# ret = Decoder.ssCommon('ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server') # ret = Decoder.ssCommon('ss://YmYtY2ZiOnRlc3QvIUAjOkAxOTIuMTY4LjEwMC4xOjg4ODg#example-server')
# ret = Decoder.sip002('ss://YWVzLTEyOC1nY206dGVzdA@192.168.100.1:8888#Example1') # ret = Decoder.sip002('ss://YWVzLTEyOC1nY206dGVzdA@192.168.100.1:8888#Example1')
# ret = Decoder.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3') # ret = Decoder.sip002('ss://2022-blake3-aes-256-gcm:YctPZ6U7xPPcU%2Bgp3u%2B0tx%2FtRizJN9K8y%2BuKlW2qjlI%3D@192.168.100.1:8888/?plugin=v2ray-plugin%3Bserver#Example3')
@ -17,7 +17,7 @@ from Filter import Filter
# ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGsvP3Byb3RvcGFyYW09') # ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGsvP3Byb3RvcGFyYW09')
# ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGs') # ret = Decoder.ssr('ssr://ZmU4MDo6MTo2MDA0OmF1dGhfYWVzMTI4X21kNTphZXMtMjU2LWNmYjp0bHMxLjJfdGlja2V0X2F1dGg6Y0dGemMzZGs')
ret = Decoder.v2rayN('vmess://eyJhZGQiOiJmbnlkdXpheW92dnNxaTMyZ2kucTc1NDMudG9wIiwicHMiOiJ2MXzpppnmuK8wM3zljp_nlJ984piF4piF4piFICgyKSIsInNjeSI6ImF1dG8iLCJ0eXBlIjoiaHR0cCIsInNuaSI6IiIsInBhdGgiOiIvIiwicG9ydCI6MzgzMzcsInYiOjIsImhvc3QiOiJ4aGY0eHE3ZDVjYjRnc3kzeW1teWR3b2kuc2luYS5jbiIsInRscyI6IiIsImlkIjoiOTAxY2UyNTUtOTM2OS1kMWUyLTk1ODQtZGE1YTdqZjA1NDdrIiwibmV0IjoidGNwIiwiYWlkIjowfQ') # ret = Decoder.v2rayN('vmess://eyJhZGQiOiJmbnlkdXpheW92dnNxaTMyZ2kucTc1NDMudG9wIiwicHMiOiJ2MXzpppnmuK8wM3zljp_nlJ984piF4piF4piFICgyKSIsInNjeSI6ImF1dG8iLCJ0eXBlIjoiaHR0cCIsInNuaSI6IiIsInBhdGgiOiIvIiwicG9ydCI6MzgzMzcsInYiOjIsImhvc3QiOiJ4aGY0eHE3ZDVjYjRnc3kzeW1teWR3b2kuc2luYS5jbiIsInRscyI6IiIsImlkIjoiOTAxY2UyNTUtOTM2OS1kMWUyLTk1ODQtZGE1YTdqZjA1NDdrIiwibmV0IjoidGNwIiwiYWlkIjowfQ')
# ret['info']['server'] = '[%s]' % ret['info']['server'] # ret['info']['server'] = '[%s]' % ret['info']['server']

Loading…
Cancel
Save