|
|
@ -5,6 +5,7 @@ import re |
|
|
|
import IPy |
|
|
|
import copy |
|
|
|
|
|
|
|
|
|
|
|
def isHost(host: str) -> bool: |
|
|
|
""" |
|
|
|
判断host是否合法 |
|
|
@ -31,6 +32,7 @@ def isHost(host: str) -> bool: |
|
|
|
except: # 异常错误 |
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def isPort(port: int) -> bool: |
|
|
|
""" |
|
|
|
判断端口是否合法 |
|
|
@ -48,6 +50,7 @@ def isPort(port: int) -> bool: |
|
|
|
pass |
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def toInt(raw) -> int: # change to int |
|
|
|
if isinstance(raw, (int, float)): # int / float -> int |
|
|
|
return int(raw) |
|
|
@ -60,6 +63,7 @@ def toInt(raw) -> int: # change to int |
|
|
|
except: |
|
|
|
raise Exception('not a integer') |
|
|
|
|
|
|
|
|
|
|
|
def toStr(raw) -> str: # change to str |
|
|
|
if raw is None: |
|
|
|
return '' |
|
|
@ -70,42 +74,70 @@ def toStr(raw) -> str: # change to str |
|
|
|
else: |
|
|
|
raise Exception('type not allowed') |
|
|
|
|
|
|
|
|
|
|
|
def toBool(raw) -> bool: # change to bool |
|
|
|
if isinstance(raw, bool): |
|
|
|
return raw |
|
|
|
if isinstance(raw, int): |
|
|
|
raw = str(raw) |
|
|
|
elif isinstance(raw, bytes): |
|
|
|
raw = str(raw, encoding='utf-8') |
|
|
|
elif not isinstance(raw, str): |
|
|
|
raise Exception('type not allowed') |
|
|
|
raw = raw.strip().lower() |
|
|
|
if raw == 'true': |
|
|
|
return True |
|
|
|
elif raw == 'false': |
|
|
|
return False |
|
|
|
else: |
|
|
|
try: |
|
|
|
raw = int(raw) |
|
|
|
return raw != 0 |
|
|
|
except: |
|
|
|
raise Exception('not a boolean') |
|
|
|
|
|
|
|
|
|
|
|
def toStrTidy(raw) -> str: # change to str with trim and lower |
|
|
|
return toStr(raw).strip().lower() |
|
|
|
|
|
|
|
|
|
|
|
class filterException(Exception): # 检测异常 |
|
|
|
def __init__(self, reason): |
|
|
|
self.reason = reason |
|
|
|
|
|
|
|
|
|
|
|
def __dictCheck(data: dict, objectList: dict, limitRules: dict, keyPrefix: str) -> dict: # 递归检查dict |
|
|
|
result = {} |
|
|
|
for key, option in limitRules.items(): # 遍历规则 |
|
|
|
keyName = key if keyPrefix == '' else keyPrefix + '.' + key |
|
|
|
keyName = '`' + keyName + '`' |
|
|
|
|
|
|
|
# 检查必选key 补全可选key |
|
|
|
if key not in data: |
|
|
|
if option['optional']: # 必选 |
|
|
|
raise filterException('Missing ' + keyName + ' option') # 必选值缺失 |
|
|
|
raise filterException('Missing `' + keyName + '` option') # 必选值缺失 |
|
|
|
else: # 可选 |
|
|
|
data[key] = option['default'] # 补全可选值 |
|
|
|
|
|
|
|
allowNone = False |
|
|
|
if 'allowNone' in option and option['allowNone']: # 允许为None |
|
|
|
allowNone = True |
|
|
|
|
|
|
|
if not (data[key] is None and allowNone): # 忽略允许None且为None的情况 |
|
|
|
if 'format' in option: # 预处理数据 |
|
|
|
try: |
|
|
|
data[key] = option['format'](data[key]) |
|
|
|
except Exception as reason: |
|
|
|
raise filterException('Illegal ' + keyName + ': ' + str(reason)) # 格式化错误 |
|
|
|
raise filterException('Illegal `' + keyName + '`: ' + str(reason)) # 格式化错误 |
|
|
|
|
|
|
|
# 检查value类型 |
|
|
|
allowNone = False |
|
|
|
if 'allowNone' in option and option['allowNone']: # 允许为None |
|
|
|
allowNone = True |
|
|
|
if data[key] is None: # 值为None |
|
|
|
if not allowNone: # 不允许为None |
|
|
|
raise filterException('Unexpected None in ' + keyName) |
|
|
|
raise filterException('Unexpected None in `' + keyName + '`') |
|
|
|
result[key] = None |
|
|
|
else: |
|
|
|
dataValue = copy.deepcopy(data[key]) |
|
|
|
if isinstance(option['type'], (str, list)) and not isinstance(dataValue, dict): # 子对象下必为dict |
|
|
|
raise filterException('Illegal `' + keyName + '`: should be dictionary') |
|
|
|
if isinstance(option['type'], str): # 单子对象 |
|
|
|
result[key] = __dictCheck(dataValue, objectList, objectList[option['type']], keyName) # 检查子对象 |
|
|
|
elif isinstance(option['type'], list): # 多子对象 |
|
|
@ -116,26 +148,36 @@ def __dictCheck(data: dict, objectList: dict, limitRules: dict, keyPrefix: str) |
|
|
|
subResult = __dictCheck(dataValue, objectList, objectList[valueType], keyName) # 尝试检查子对象 |
|
|
|
except filterException as reason: |
|
|
|
errMsg = str(reason) # 捕获抛出信息 |
|
|
|
temp = None |
|
|
|
except: |
|
|
|
temp = None |
|
|
|
continue |
|
|
|
else: # 子对象匹配成功 |
|
|
|
break |
|
|
|
if subResult is None: # 无匹配子级 |
|
|
|
if errMsg is not None: # 存在子级异常信息 |
|
|
|
raise filterException(errMsg) |
|
|
|
raise filterException('Error in ' + keyName + ' option') |
|
|
|
raise filterException('Error in `' + keyName + '` option') |
|
|
|
result[key] = subResult |
|
|
|
elif not isinstance(data[key], option['type']): # 类型不匹配 |
|
|
|
raise filterException('Illegal ' + keyName + ' option') |
|
|
|
raise filterException('Illegal `' + keyName + '` option') |
|
|
|
else: # 检查无误 |
|
|
|
result[key] = dataValue |
|
|
|
|
|
|
|
if result[key] is not None: # allowNone为False |
|
|
|
if 'filter' in option and not option['filter'](result[key]): # 格式检查 |
|
|
|
if 'filter' in option: |
|
|
|
errFlag = False |
|
|
|
try: |
|
|
|
if not option['filter'](result[key]): # 格式检查 |
|
|
|
errFlag = True |
|
|
|
except: |
|
|
|
raise filterException('Filter error in `' + keyName + '`') |
|
|
|
else: |
|
|
|
if errFlag: |
|
|
|
if 'indexKey' in option and option['indexKey']: |
|
|
|
raise Exception('Filter index key') |
|
|
|
raise filterException(option['errMsg']) |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def __ruleCheck(ruleSet: dict) -> None: # 规则集合法性检查 |
|
|
|
if 'rootObject' not in ruleSet: # 根对象检查 |
|
|
|
raise Exception('Miss root object') |
|
|
@ -195,6 +237,10 @@ def __ruleCheck(ruleSet: dict) -> None: # 规则集合法性检查 |
|
|
|
if 'errMsg' in option and not isinstance(option['errMsg'], str): # errMsg必须为str |
|
|
|
raise Exception('Error message must be str in ' + keyName) |
|
|
|
|
|
|
|
if 'indexKey' in option and not isinstance(option['indexKey'], bool): # indexKey必为bool |
|
|
|
raise Exception('Illegal indexKey in ' + keyName) |
|
|
|
|
|
|
|
|
|
|
|
def ruleFilter(rawData: dict, ruleSet: dict, header: dict) -> tuple[bool, dict or str]: |
|
|
|
""" |
|
|
|
使用规则集检查原始数据 |
|
|
|