You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
146 lines
6.8 KiB
146 lines
6.8 KiB
#对llm返回的指令进行校验
|
|
import re
|
|
class CommandVerify:
|
|
def __init__(self):
|
|
pass
|
|
|
|
#验证节点指令的结构完整性--主要是判断JSON元素是否完整
|
|
def verify_node_cmds(self,json_datas):
|
|
'''
|
|
- 新增节点:{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"节点1,节点2\"};
|
|
- 未生成指令节点列表:{\"action\": \"no_instruction\", \"nodes\": \"节点1,节点2\"};
|
|
- 漏洞验证成功:{\"action\": \"find_vul\", \"node\": \"节点\",\"vulnerability\": {\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}};
|
|
- 完成测试:{\"action\": \"end_work\", \"node\": \"节点\"};
|
|
'''
|
|
strerror = ""
|
|
for json_data in json_datas:
|
|
if "action" not in json_data:
|
|
strerror = {"节点指令错误":f"{json_data}缺少action节点,不符合格式要求!"}
|
|
break
|
|
action = json_data["action"]
|
|
if action == "dash" or action == "python":
|
|
required_keys = {"path", "content"}
|
|
elif action == "add_node":
|
|
required_keys = {"parent", "nodes"}
|
|
elif action == "end_work":
|
|
required_keys = {"node"}
|
|
elif action =="find_vul":
|
|
required_keys = {"node","vulnerability","name","risk","info"}
|
|
elif action == "asset":
|
|
if "IPS" not in json_data or "URL" not in json_data:
|
|
strerror = {"JSON结果格式错误": f"{json_data}不符合格式要求,缺少节点!"}
|
|
break
|
|
ips_json = json_data["IPS"]
|
|
url_json = json_data["URL"]
|
|
#URL信息检查
|
|
if url_json:
|
|
required_keys = {"Domain", "Subdomains", "Registrant", "Email","Registrar","Creation_date", "Expiration_date"}
|
|
is_valid, missing = self.validate_json_keys(url_json, required_keys)
|
|
if not is_valid:
|
|
strerror = {"JSON结果格式错误": f"{url_json}缺少主键{missing}"}
|
|
break
|
|
#IP信息检查
|
|
#[\"IP\":\"192.168.1.100\",\"IPtype\":\"IPv4/IPv6\",\"Ports\":[端口信息json]]}
|
|
#{\"Port\":\"端口号\",\"Service\":\"服务名称\",\"Version\":\"版本号\",\"Protocol\":\"TCP/UDP\",\"Status\":\"open/closed/filtered\"};
|
|
for ip_json in ips_json:
|
|
required_keys = {"IP","IPtype","Ports"}
|
|
is_valid, missing = self.validate_json_keys(ip_json, required_keys)
|
|
if not is_valid:
|
|
strerror = {"JSON结果格式错误": f"{ip_json}缺少主键{missing}"}
|
|
break
|
|
#端口数据检查
|
|
ports_json = ip_json["Ports"]
|
|
for port in ports_json:
|
|
required_keys = {"Port","Service","Version","Protocol","Status"}
|
|
is_valid,missing = self.validate_json_keys(port,required_keys)
|
|
if not is_valid:
|
|
strerror = {"JSON结果格式错误": f"{port}缺少主键{missing}"}
|
|
break
|
|
continue
|
|
else:
|
|
strerror = {"节点指令错误": f"{json_data}含有不可识别的action值!"}
|
|
break
|
|
|
|
is_valid, missing = self.validate_json_keys(json_data, required_keys)
|
|
if not is_valid:
|
|
strerror = {"JSON结果格式错误": f"{json_data}缺少主键{missing}"}
|
|
break
|
|
if not strerror:
|
|
return True,strerror
|
|
return False,strerror
|
|
|
|
#验证JSON关键字是否缺失
|
|
def validate_json_keys(self,json_obj, required_keys):
|
|
"""
|
|
校验JSON对象是否包含所有必需的键。
|
|
|
|
Args:
|
|
json_obj (dict): 要校验的JSON对象。
|
|
required_keys (set): 必需键的集合。
|
|
|
|
Returns:
|
|
tuple: (bool, list) - 表示是否所有键都存在的布尔值,以及缺失键的列表(如果都存在则为空)。
|
|
"""
|
|
if not isinstance(json_obj, dict):
|
|
return False, ["JSON对象必须是字典类型"]
|
|
|
|
json_keys = set(json_obj.keys())
|
|
missing_keys = required_keys - json_keys
|
|
|
|
if missing_keys:
|
|
return False, list(missing_keys)
|
|
else:
|
|
return True, []
|
|
|
|
# 验证节点数据的合规性
|
|
def verify_node_data(self,node_cmds):
|
|
add_nodes = []
|
|
no_instr_nodes = []
|
|
for node_cmd in node_cmds:
|
|
do_type = node_cmd["action"]
|
|
if do_type == "add_node":
|
|
nodes = node_cmd["nodes"].split(",")
|
|
add_nodes.extend(nodes)
|
|
elif do_type == "no_instruction":
|
|
nodes = node_cmd["nodes"].split(",")
|
|
no_instr_nodes.extend(nodes)
|
|
else:# 其他类型暂时不验证
|
|
pass
|
|
#核对指令是否有缺失
|
|
had_inst_nodes = self._difference_a_simple(add_nodes,no_instr_nodes) #在新增节点,但不在没有指令列表,就是应该要有指令的节点数据
|
|
no_add_nodes = self._difference_a_simple(no_instr_nodes,add_nodes) #在未新增指令的节点,但不在新增节点,就是没有add的节点,需要新增
|
|
return had_inst_nodes,no_add_nodes
|
|
|
|
#--------------辅助函数-----------------
|
|
def get_path_from_command(self,command):
|
|
pass
|
|
|
|
def _difference_a(self,list_a: list, list_b: list) -> list:
|
|
"""获取 list_a 中存在但 list_b 中不存在的元素(去重版)"""
|
|
set_b = set(list_b)
|
|
return [x for x in list_a if x not in set_b]
|
|
|
|
def _difference_b(self,list_a: list, list_b: list) -> list:
|
|
"""获取 list_b 中存在但 list_a 中不存在的元素(去重版)"""
|
|
set_a = set(list_a)
|
|
return [x for x in list_b if x not in set_a]
|
|
|
|
def _difference_a_keep_duplicates(self,list_a: list, list_b: list) -> list:
|
|
"""获取 list_a 中存在但 list_b 中不存在的元素(保留所有重复项和顺序)"""
|
|
set_b = set(list_b)
|
|
return [x for x in list_a if x not in set_b]
|
|
|
|
def _difference_b_keep_duplicates(self,list_a: list, list_b: list) -> list:
|
|
"""获取 list_b 中存在但 list_a 中不存在的元素(保留所有重复项和顺序)"""
|
|
set_a = set(list_a)
|
|
return [x for x in list_b if x not in set_a]
|
|
|
|
def _difference_a_simple(self,list_a: list, list_b: list) -> list:
|
|
"""集合差集:list_a - list_b"""
|
|
return list(set(list_a) - set(list_b))
|
|
|
|
def _difference_b_simple(self,list_a: list, list_b: list) -> list:
|
|
"""集合差集:list_b - list_a"""
|
|
return list(set(list_b) - set(list_a))
|
|
|
|
g_CV = CommandVerify()
|