|
@ -48,6 +48,31 @@ def isPort(port: int) -> bool: |
|
|
pass |
|
|
pass |
|
|
return False |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def toInt(raw) -> int: # change to int |
|
|
|
|
|
if isinstance(raw, (int, float)): # int / float -> int |
|
|
|
|
|
return int(raw) |
|
|
|
|
|
elif isinstance(raw, bytes): # bytes -> str |
|
|
|
|
|
raw = str(raw, encoding = 'utf-8') |
|
|
|
|
|
elif not isinstance(raw, str): |
|
|
|
|
|
raise Exception('type not allowed') |
|
|
|
|
|
try: |
|
|
|
|
|
return int(raw) |
|
|
|
|
|
except: |
|
|
|
|
|
raise Exception('not a integer') |
|
|
|
|
|
|
|
|
|
|
|
def toStr(raw) -> str: # change to str |
|
|
|
|
|
if raw is None: |
|
|
|
|
|
return '' |
|
|
|
|
|
elif isinstance(raw, str): |
|
|
|
|
|
return raw |
|
|
|
|
|
elif isinstance(raw, bytes): |
|
|
|
|
|
return str(raw, encoding='utf-8') |
|
|
|
|
|
else: |
|
|
|
|
|
raise Exception('type not allowed') |
|
|
|
|
|
|
|
|
|
|
|
def toStrTidy(raw) -> str: # change to str with trim and lower |
|
|
|
|
|
return toStr(raw).strip().lower() |
|
|
|
|
|
|
|
|
class filterException(Exception): # 检测异常 |
|
|
class filterException(Exception): # 检测异常 |
|
|
def __init__(self, reason): |
|
|
def __init__(self, reason): |
|
|
self.reason = reason |
|
|
self.reason = reason |
|
@ -56,66 +81,138 @@ def __dictCheck(data: dict, objectList: dict, limitRules: dict, keyPrefix: str) |
|
|
result = {} |
|
|
result = {} |
|
|
for key, option in limitRules.items(): # 遍历规则 |
|
|
for key, option in limitRules.items(): # 遍历规则 |
|
|
keyName = key if keyPrefix == '' else keyPrefix + '.' + key |
|
|
keyName = key if keyPrefix == '' else keyPrefix + '.' + key |
|
|
|
|
|
keyName = '`' + keyName + '`' |
|
|
|
|
|
|
|
|
# 检查必选key 补全可选key |
|
|
# 检查必选key 补全可选key |
|
|
if key not in data: |
|
|
if key not in data: |
|
|
if option['optional']: # 必选 |
|
|
if option['optional']: # 必选 |
|
|
raise filterException('Missing `' + keyName + '` option') # 必选值缺失 |
|
|
raise filterException('Missing ' + keyName + ' option') # 必选值缺失 |
|
|
else: # 可选 |
|
|
else: # 可选 |
|
|
data[key] = option['default'] # 补全可选值 |
|
|
data[key] = option['default'] # 补全可选值 |
|
|
|
|
|
|
|
|
if 'format' in option: # 预处理 |
|
|
if 'format' in option: # 预处理数据 |
|
|
|
|
|
try: |
|
|
data[key] = option['format'](data[key]) |
|
|
data[key] = option['format'](data[key]) |
|
|
|
|
|
except Exception as reason: |
|
|
|
|
|
raise filterException('Illegal ' + keyName + ': ' + str(reason)) # 格式化错误 |
|
|
|
|
|
|
|
|
# 检查value类型 |
|
|
# 检查value类型 |
|
|
if 'allowNone' in option and option['allowNone'] and data[key] is None: # 允许为None且值为None |
|
|
allowNone = False |
|
|
|
|
|
if 'allowNone' in option and option['allowNone']: # 允许为None |
|
|
|
|
|
allowNone = True |
|
|
|
|
|
if data[key] is None: # 值为None |
|
|
|
|
|
if not allowNone: # 不允许为None |
|
|
|
|
|
raise filterException('Unexpected None in ' + keyName) |
|
|
result[key] = None |
|
|
result[key] = None |
|
|
else: |
|
|
else: |
|
|
if isinstance(option['type'], str): |
|
|
dataValue = copy.deepcopy(data[key]) |
|
|
result[key] = __dictCheck(data[key], objectList, objectList[option['type']], keyName) # 检查子对象 |
|
|
if isinstance(option['type'], str): # 单子对象 |
|
|
elif isinstance(option['type'], list): |
|
|
result[key] = __dictCheck(dataValue, objectList, objectList[option['type']], keyName) # 检查子对象 |
|
|
temp = None |
|
|
elif isinstance(option['type'], list): # 多子对象 |
|
|
|
|
|
subResult = None |
|
|
errMsg = None |
|
|
errMsg = None |
|
|
for valueType in option['type']: # 遍历子Object |
|
|
for valueType in option['type']: # 遍历子Object |
|
|
try: |
|
|
try: |
|
|
subObject = copy.deepcopy(data[key]) |
|
|
subResult = __dictCheck(dataValue, objectList, objectList[valueType], keyName) # 尝试检查子对象 |
|
|
temp = __dictCheck(subObject, objectList, objectList[valueType], keyName) # 尝试检查子对象 |
|
|
|
|
|
except filterException as reason: |
|
|
except filterException as reason: |
|
|
errMsg = reason # 捕获抛出信息 |
|
|
errMsg = str(reason) # 捕获抛出信息 |
|
|
temp = None |
|
|
temp = None |
|
|
continue |
|
|
|
|
|
except: |
|
|
except: |
|
|
temp = None |
|
|
temp = None |
|
|
continue |
|
|
else: # 子对象匹配成功 |
|
|
break |
|
|
break |
|
|
if temp is None: # 无匹配子级 |
|
|
if subResult is None: # 无匹配子级 |
|
|
if errMsg is not None: # 存在子级异常信息 |
|
|
if errMsg is not None: # 存在子级异常信息 |
|
|
raise filterException(errMsg) |
|
|
raise filterException(errMsg) |
|
|
raise filterException('Error in `' + keyName + '` option') |
|
|
raise filterException('Error in ' + keyName + ' option') |
|
|
result[key] = temp |
|
|
result[key] = subResult |
|
|
elif not isinstance(data[key], option['type']): # 类型不匹配 |
|
|
elif not isinstance(data[key], option['type']): # 类型不匹配 |
|
|
raise filterException('Illegal `' + keyName + '` option') |
|
|
raise filterException('Illegal ' + keyName + ' option') |
|
|
else: |
|
|
else: # 检查无误 |
|
|
result[key] = copy.deepcopy(data[key]) |
|
|
result[key] = dataValue |
|
|
|
|
|
|
|
|
if 'filter' in option and not option['filter'](data[key]): # 格式检查 |
|
|
if result[key] is not None: # allowNone为False |
|
|
|
|
|
if 'filter' in option and not option['filter'](result[key]): # 格式检查 |
|
|
raise filterException(option['errMsg']) |
|
|
raise filterException(option['errMsg']) |
|
|
return result |
|
|
return result |
|
|
|
|
|
|
|
|
def rulesFilter(rawData: dict, rulesList: dict, header: dict) -> tuple[bool, dict or str]: |
|
|
def __ruleCheck(ruleSet: dict) -> None: # 规则集合法性检查 |
|
|
|
|
|
if 'rootObject' not in ruleSet: # 根对象检查 |
|
|
|
|
|
raise Exception('Miss root object') |
|
|
|
|
|
for objName in ruleSet: # 遍历全部对象 |
|
|
|
|
|
if objName[-6:] != 'Object': # 对象名必须以Object结尾 |
|
|
|
|
|
raise Exception('Illegal object name `' + objName + '`') |
|
|
|
|
|
for key, option in ruleSet[objName].items(): |
|
|
|
|
|
keyName = '`' + objName + '.' + key + '`' |
|
|
|
|
|
if 'optional' not in option or not isinstance(option['optional'], bool): # optional检查 |
|
|
|
|
|
raise Exception('Illegal optional in ' + keyName) |
|
|
|
|
|
if not option['optional'] and 'default' not in option: # optional为False时应有default |
|
|
|
|
|
raise Exception('Miss default value in ' + keyName) |
|
|
|
|
|
|
|
|
|
|
|
allowNone = False |
|
|
|
|
|
if 'allowNone' in option and not isinstance(option['allowNone'], bool): |
|
|
|
|
|
raise Exception('Illegal allowNone in ' + keyName) |
|
|
|
|
|
if 'allowNone' in option and option['allowNone']: |
|
|
|
|
|
allowNone = True |
|
|
|
|
|
|
|
|
|
|
|
if 'type' not in option: # type值缺失 |
|
|
|
|
|
raise Exception('Miss type in ' + keyName) |
|
|
|
|
|
if not isinstance(option['type'], (type, str, list)): # type为变量类型 / str / list |
|
|
|
|
|
raise Exception('Illegal type in ' + keyName) |
|
|
|
|
|
if isinstance(option['type'], str) and option['type'] not in ruleSet: # 子Object未定义 |
|
|
|
|
|
raise Exception('Object `' + option['type'] + '` not found in ' + keyName) |
|
|
|
|
|
if isinstance(option['type'], list): |
|
|
|
|
|
for subObjName in option['type']: |
|
|
|
|
|
if not isinstance(subObjName, str): |
|
|
|
|
|
raise Exception('Type list must be str in ' + keyName) |
|
|
|
|
|
if subObjName not in ruleSet: # 子Object未定义 |
|
|
|
|
|
raise Exception('Object `' + subObjName + '` not found in ' + keyName) |
|
|
|
|
|
|
|
|
|
|
|
if 'default' in option: |
|
|
|
|
|
if option['default'] is None: # default值检查 |
|
|
|
|
|
if not allowNone: |
|
|
|
|
|
raise Exception(keyName + ' can\'t be None') |
|
|
|
|
|
else: |
|
|
|
|
|
if isinstance(option['type'], type): # type |
|
|
|
|
|
if not isinstance(option['default'], option['type']): |
|
|
|
|
|
raise Exception('Error default type in ' + keyName) |
|
|
|
|
|
else: # str / list |
|
|
|
|
|
if not isinstance(option['default'], dict): |
|
|
|
|
|
raise Exception('Default type should be dict in ' + keyName) |
|
|
|
|
|
|
|
|
|
|
|
if 'format' in option and not callable(option['format']): # format必须为函数 |
|
|
|
|
|
raise Exception('Format option must be a function in ' + keyName) |
|
|
|
|
|
if isinstance(option['type'], type) and 'format' not in option: # 指定变量类型时需有format函数 |
|
|
|
|
|
raise Exception('Miss format in ' + keyName) |
|
|
|
|
|
|
|
|
|
|
|
if 'filter' in option: |
|
|
|
|
|
if 'errMsg' not in option: # filter与errMsg同时存在 |
|
|
|
|
|
raise Exception('Miss errMsg option in ' + keyName) |
|
|
|
|
|
if not callable(option['filter']): # filter必须为函数 |
|
|
|
|
|
raise Exception('Filter option must be a function in ' + keyName) |
|
|
|
|
|
if not isinstance(option['type'], type): |
|
|
|
|
|
raise Exception('Overage filter option in ' + keyName) |
|
|
|
|
|
if 'errMsg' in option and not isinstance(option['errMsg'], str): # errMsg必须为str |
|
|
|
|
|
raise Exception('Error message must be str in ' + keyName) |
|
|
|
|
|
|
|
|
|
|
|
def ruleFilter(rawData: dict, ruleSet: dict, header: dict) -> tuple[bool, dict or str]: |
|
|
""" |
|
|
""" |
|
|
规则参数 |
|
|
使用规则集检查原始数据 |
|
|
optional -> 必选 |
|
|
|
|
|
default -> optional为False时必选 |
|
|
原始数据错误: |
|
|
type -> 必选 |
|
|
return False, {reason} |
|
|
allowNone -> 可选 |
|
|
|
|
|
format -> 可选 |
|
|
原始数据无误: |
|
|
filter -> 可选 |
|
|
return True, {dist} |
|
|
errMsg -> filter存在时必选 |
|
|
|
|
|
""" |
|
|
""" |
|
|
data = rawData |
|
|
|
|
|
try: |
|
|
try: |
|
|
data = __dictCheck(data, rulesList, rulesList['rootObject'], '') # 开始检查 |
|
|
__ruleCheck(ruleSet) # 检查规则集 |
|
|
|
|
|
except Exception as reason: |
|
|
|
|
|
return False, 'Filter rules -> ' + str(reason) # 规则集有误 |
|
|
|
|
|
|
|
|
|
|
|
data = copy.deepcopy(rawData) |
|
|
|
|
|
try: |
|
|
|
|
|
data = __dictCheck(data, ruleSet, ruleSet['rootObject'], '') # 开始检查 |
|
|
except filterException as reason: # 节点格式错误 |
|
|
except filterException as reason: # 节点格式错误 |
|
|
return False, str(reason) |
|
|
return False, str(reason) |
|
|
except: |
|
|
except: |
|
|