diff --git a/config.yaml b/config.yaml index a0ca0a6..9876300 100644 --- a/config.yaml +++ b/config.yaml @@ -24,6 +24,7 @@ LLM_max_chain_count: 10 #为了避免推理链过长,造成推理效果变差 #Node max_do_sn: 15 #同一节点最多执行5次指令 +max_llm_sn: 5 #同一节点最多llm的提交次数 #用户初始密码 pw: zfkj_123!@# diff --git a/mycode/AttackMap.py b/mycode/AttackMap.py index d4f145d..3fc2cbe 100644 --- a/mycode/AttackMap.py +++ b/mycode/AttackMap.py @@ -173,11 +173,10 @@ class TreeNode: def __init__(self, name,task_id,status="未完成", vul_type="未发现"): self.task_id = task_id #任务id self.name = name # 节点名称 - #self.node_lock = threading.Lock() #线程锁 self.bwork = True # 当前节点是否工作,默认True --停止/启动 self.status = status # 节点测试状态 -- 由llm返回指令触发更新 #work_status需要跟两个list统一管理:初始0,入instr_queue为1,入instr_node_mq为2,入res_queue为3,入llm_node_mq为4,llm处理完0或1 - self._work_status = 0 #0-无任务,1-待执行测试指令,2-执行指令中,3-待提交Llm,4-提交llm中, 2025-4-6新增,用来动态显示节点的工作细节。 + self._work_status = -1 #0-无任务,1-待执行测试指令,2-执行指令中,3-待提交Llm,4-提交llm中, 2025-4-6新增,用来动态显示节点的工作细节。 #self.work_status_lock = threading.Lock() ---节点不能有锁 self.vul_type = vul_type # 漏洞类型--目前赋值时没拆json self.vul_name = "" @@ -202,12 +201,34 @@ class TreeNode: #用户补充信息 self.cookie = "" self.ext_info = "" + #线程锁-- 2025-5-9 两个list 合并在一起管理,只会有一个List 有值 + self.work_status_lock = threading.Lock() + + def __getstate__(self): + state = self.__dict__.copy() + for attr in ('work_status_lock'): + state.pop(attr, None) + return state + + def __setstate__(self, state): + # 恢复其余字段 + self.__dict__.update(state) + # 重建运行时用的锁 + self.work_status_lock = threading.Lock() #设置用户信息 def set_user_info(self,cookie,ext_info): self.cookie = cookie self.ext_info = ext_info + #添加子节点 + def add_child(self, child_node): + child_node.parent = self + child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 + child_node.step_num = self.step_num + self.children.append(child_node) + + #---------------------messages处理-------------------- def copy_messages(self,p_msg,c_msg): #2025-4-23修改用做给本节点加msg ''' 当前节点添加mesg,约束:p_msg除system只取两层,c_msg:只取最后两个 @@ -216,7 +237,7 @@ class TreeNode: :return: ''' if not p_msg or not c_msg: - print("Messages存储存在问题!需要立即检查逻辑!") + print("Messages存储存在问题!需要检查逻辑!") return tmp_pmsg = copy.deepcopy(p_msg) tmp_cmsg = copy.deepcopy(c_msg) @@ -248,91 +269,143 @@ class TreeNode: for msg in tmp_cmsg[isart:]: self.parent_messages.append(msg) - #添加子节点 - def add_child(self, child_node): - child_node.parent = self - child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 - self.children.append(child_node) + def updatemsg(self,newtype,newcontent,p_msg,c_msg,index=0): #index待处理,目前待提交状态时,只应该有一条待提交数据 + with self.work_status_lock: + if self._work_status == 0: #新增指令 + if not self._llm_quere: + #判断是否要copy-父节点msg + if not self.parent_messages: + self.copy_messages(p_msg,c_msg) + newmsg = {"llm_type": int(newtype), "result": newcontent} + self._llm_quere.append(newmsg) + # 更新节点状态 + self._work_status = 3 # 待提交 + else: + return False,"新增指令,待提交数据不应该有数据" + elif self._work_status == 3: #只允许待提交状态修改msg + if self._llm_quere: + oldmsg = self._llm_quere[0] + oldmsg_llm_type = oldmsg["llm_type"] # llm_type不允许修改 + newmsg = {"llm_type": int(oldmsg_llm_type), "result": newcontent} + self._llm_quere[0] = newmsg + else: + return False,"状态是待提交,不应该没有待提交数据" + else: + return False,"该状态,不运行修改待提交数据" + return True,"" + + def is_instr_empty(self):#待改 --根据work_status判断 + with self.work_status_lock: + if self._instr_queue: + return False + return True + + def is_llm_empty(self):#待改 --根据work_status判断 + with self.work_status_lock: + if self._llm_quere: + return False + return True - #修改节点的执行状态--return bchange + #修改节点的执行状态--return bchange 只能是2-4 def update_work_status(self,work_status): - bsuccess = False - if self._work_status != work_status: - self._work_status = work_status - bsuccess = True + bsuccess = True + with self.work_status_lock: + if self._work_status == 1 and work_status == 2: #只允许从1-》2 + self._work_status = 2 + elif self._work_status == 3 and work_status == 4:#只允许从3-》4 + self._work_status = 4 + elif self._work_status ==4 and work_status == 0: #4->0 + self._work_status = 0 + elif work_status == -1: + self._work_status = 0 + elif work_status == -2: + self._work_status = 2 + elif work_status == -3: + self._work_status = 4 + else: + bsuccess = False return bsuccess def get_work_status(self): - #加锁有没有意义---web端和本身的工作线程会有同步问题,但与持久化相比,暂时忽略 + #加锁有没有意义---web端和本身的工作线程会有同步问题 work_status = self._work_status return work_status - #-------后期扩充逻辑,目前wokr_status的修改交给上层类对象------- - def add_instr(self,instr,p_msg,c_msg): - if not self.parent_messages: #为空时赋值 - self.copy_messages(p_msg,c_msg) - self._instr_queue.append(instr) - - def test_add_instr(self,instr): - self._instr_queue.append(instr) - self._llm_quere = [] - - def get_instr(self): - return self._instr_queue.pop(0) if self._instr_queue else None - - def get_instr_user(self): - return self._instr_queue - - def del_instr(self,instr): - if instr in self._instr_queue: - self._instr_queue.remove(instr) - #指令删除后要判断是否清空指令了 - if not self._instr_queue: - self._work_status = 0 #状态调整为没有带执行指令 - return True,"" + def add_instr(self,instr,p_msg,c_msg): #所有指令一次提交 + if instr: + with self.work_status_lock: + if not self.parent_messages: #为空时赋值 + self.copy_messages(p_msg,c_msg) + if self._work_status in (-1,1,4): + self._instr_queue.append(instr) + self._work_status = 1 #待执行 + return True + else: + print("插入指令时,状态不为-1,1,4!") + return False,"节点的工作状态不是0或4,请检查程序逻辑" else: - return False,"该指令不在队列中!" - + return False,"指令数据为空" + def get_instr(self): + with self.work_status_lock: + if self._work_status == 2: #执行中 + return self._instr_queue.pop(0) if self._instr_queue else None + else: + print("不是执行中,不应该来取指令!") + return None - def add_res(self,str_res): #结构化结果字串 - self._llm_quere.append(str_res) + def del_instr(self,instr): #web端,手动删除指令 + with self.work_status_lock: + if self._work_status == 1: + if instr in self._instr_queue: + self._instr_queue.remove(instr) + #指令删除后要判断是否清空指令了 + if not self._instr_queue: + self._work_status = 0 #状态调整为无待执行任务 + return True,"" + else: + return False,"该指令不在队列中!" + else: + return False,"只有待执行时,允许删除指令" + + def add_res(self,str_res): #llm_queue入库的情况比较多,2,0,4 + if str_res: + with self.work_status_lock: + if self._work_status in (2,0,4): + self._llm_quere.append(str_res) + if self._work_status in (2,0): #提交中,不要改变执行状态 + self._work_status =3 + else: + print("添加llm数据时,状态不是0,2,4中的一种情况") + return False,"添加llm数据时,状态不是0,2,4中的一种情况" + else: + return False,"待提交llm的数据为空" def get_res(self): - return self._llm_quere.pop(0) if self._llm_quere else None - - def get_res_user(self): - return self._llm_quere + with self.work_status_lock: + if self._work_status ==4: #提交中 + return self._llm_quere.pop(0) if self._llm_quere else None + else: + print("不是提交中,不应该来取待提交数据!") + return None - def get_work_status(self): - return self._work_status - - def updatemsg(self,newtype,newcontent,index): - - if self._llm_quere:# - oldmsg_llm_type = self._llm_quere[0]["llm_type"] #llm_type不修改,还未验证 - newmsg = {"llm_type": int(oldmsg_llm_type), "result": newcontent} - self._llm_quere[0] = newmsg - else:#新增消息 - newmsg = {"llm_type": int(newtype), "result": newcontent} - self._llm_quere.append(newmsg) - #更新节点状态 - self._work_status = 3 #待提交 - return True,"" + def clear_res(self): + with self.llm_list_lock: + self._llm_quere.clear() - def is_instr_empty(self): - if self._instr_queue: - return False - return True + #-----------web查看数据----------- + def get_instr_user(self): #读不用锁了 -有错误问题不大 + with self.work_status_lock: + instr_que = self._instr_queue.copy() + return instr_que - def is_llm_empty(self): - if self._llm_quere: - return False - return True + def get_res_user(self): #读不用锁了 -- 有错误问题不大 + with self.work_status_lock: + llm_que = self._llm_quere.copy() + return llm_que def __repr__(self): return f"TreeNode({self.name}, {self.status}, {self.vul_type})" - if __name__ == "__main__": pass \ No newline at end of file diff --git a/mycode/TaskManager.py b/mycode/TaskManager.py index 57e9f17..1839df1 100644 --- a/mycode/TaskManager.py +++ b/mycode/TaskManager.py @@ -31,7 +31,9 @@ class TaskManager: #程序启动后,加载未完成的测试任务 def load_tasks(self): - '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>2''' + '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>2 + #若不是异常停止,正常情况下,任务都应该是没有待办MQ的 + ''' datas = app_DBM.get_run_tasks() for data in datas: task_id = data[0] @@ -51,15 +53,15 @@ class TaskManager: task.init_task(task_id,attack_tree) #开始任务 ---根据task_status来判断是否需要启动工作线程 if task_status == 1: - if self.cur_task_run_num < self.max_run_task: #load 是程序刚起,只有主线程,不加锁 - bsuc,strout = task.start_task() - if bsuc: - self.cur_task_run_num +=1 + if self.cur_task_run_num < self.max_run_task: #load 是程序刚起,只有主线程,不加锁 + bsuc,strout = task.start_task() + if bsuc: + self.cur_task_run_num +=1 + else: + task.update_task_status(0) else: - task.update_task_status(0) - else: - self.logger.error("重载未结束任务,不应该超过最大运行数量的task_status为启动状态") - task.update_task_status(0)#尝试硬恢复 + self.logger.error("重载未结束任务,不应该超过最大运行数量的task_status为启动状态") + task.update_task_status(0)#尝试硬恢复 # 内存保留task对象 self.tasks[task_id] = task else: @@ -90,7 +92,7 @@ class TaskManager: #获取task_id -- test_target,cookie_info,work_type,llm_type 入数据库 task_id = app_DBM.start_task(target,"",work_type,llm_type,fake_target) if task_id >0: - #2025-4-28调整批量添加任务,默认不启动 + #2025-4-28调整批量添加任务,默认不启动线程start_task task.init_task(task_id) #保留task对象 self.tasks[task_id] = task @@ -256,15 +258,8 @@ class TaskManager: if task: node = task.attack_tree.find_node_by_nodepath(nodepath) if node: - work_status = node.get_work_status() - if work_status == 0 or work_status == 3: - if work_status == 0: - if not node.parent_messages: #如果messages为空--且不会是根节点 - node.copy_messages(node.parent.parent_messages,node.parent.cur_messages) - bsuccess,error = node.updatemsg(newtype,newcontent,0) #取的第一条,也就修改第一条 - return bsuccess,error - else: - return False,"当前节点的工作状态不允许修改MSG!" + bsuccess,error = node.updatemsg(newtype,newcontent,node.parent.parent_messages,node.parent.cur_messages,0) + return bsuccess,error return False,"找不到对应节点!" def del_node_instr(self,task_id,nodepath,instr): diff --git a/mycode/TaskObject.py b/mycode/TaskObject.py index 0f3dfa9..4d89c5a 100644 --- a/mycode/TaskObject.py +++ b/mycode/TaskObject.py @@ -15,6 +15,7 @@ from mycode.WebSocketManager import g_WSM from mycode.CommandVerify import g_CV from mycode.PythonTManager import PythonTManager from mycode.DataFilterManager import DataFilterManager +from myutils.ReadWriteLock import ReadWriteLock import asyncio import queue import time @@ -46,31 +47,72 @@ class TaskObject: self.task_status = 0 #0-暂停,1-执行中,2-已完成 3-未启动,2025-4-27为配合批量添加任务增加“未启动”状态 self.local_ip = local_ip self.attack_tree = None #任务节点树 + self.attack_tree_lock = None #把节点的Lock外置 self.safe_rank = safe_rank #安全级别 0-9 #?暂时还没实现更新逻辑 self.is_had_work = False self.is_had_work_lock = threading.Lock() + #读写锁 + self.rwlock = ReadWriteLock() #指令执行相关------- self.max_thread_num = num_threads #指令执行线程数量 self.workth_list = [None] * num_threads #线程句柄list + self.workth_node_list = [None] * num_threads # 线程句柄list self.doing_instr_list= [""] * num_threads self.instr_node_queue = queue.Queue() #待执行指令的节点队列 self.node_num = 0 #在处理Node线程的处理 #llm执行相关-------- self.llm_max_nums = myCongif.get_data("LLM_max_threads") # 控制最大并发指令数量 --- 多线程的话节点树需要加锁 self.llmth_list = [None] * self.llm_max_nums # llm线程list + self.llmth_node_list = [None] * self.llm_max_nums # llm线程--node self.doing_llm_list = [""] * self.llm_max_nums self.llm_node_queue = queue.Queue() #待提交LLM的节点队列 #自检线程----------- self.check_th = None #自检线程句柄 - #单步执行控制------- - self.one_step_num = 0 #单步执行任务次数,默认1 #-----四队列------- self.run_instr_lock = threading.Lock() # 线程锁 self.runing_instr = {} #执行中指令记录 #---------------三个线程------------ #测试指令执行线程 + + #读取待办指令节点 + def get_instr_node(self,index): + self.rwlock.acquire_read() + try: + self.workth_node_list[index] = self.instr_node_queue.get(block=False) + except queue.Empty: + self.workth_node_list[index] = None + finally: + self.rwlock.release_read() + + def put_instr_node(self,node): + #self.instr_node_queue.put(node) + self.rwlock.acquire_read() + try: + self.instr_node_queue.put(node) + finally: + self.rwlock.release_read() + + #读取待办llm节点 + def get_llm_node(self,index): + self.rwlock.acquire_read() + try: + self.llmth_node_list[index] = self.llm_node_queue.get(block=False) + except queue.Empty: + self.llmth_node_list[index] = None + finally: + self.rwlock.release_read() + + def put_llm_node(self,node): + # self.llm_node_queue.put(node) + self.rwlock.acquire_read() + try: + self.llm_node_queue.put(node) + finally: + self.rwlock.release_read() + + def mill_instr_preprocess(self,instructions,str_split): new_instr = [] instrs = instructions.split(str_split) @@ -129,13 +171,14 @@ class TaskObject: bnode_work = False while self.brun: if self.task_status == 1: - try: + self.get_instr_node(th_index) #获取一个待办节点 + if self.workth_node_list[th_index]: + work_node = self.workth_node_list[th_index] llm_type = 1 - work_node = self.instr_node_queue.get(block=False)#正常一个队列中一个节点只会出现一次,进而也就只有一个线程在处理 # 开始执行指令 bnode_work = True results = [] - while True: + while True: #遍历节点待执行指令,执行 instruction = work_node.get_instr() if not instruction: break @@ -155,18 +198,16 @@ class TaskObject: fake_inst,fake_resul = self.DataFilter.filter_result(instr,reslut) oneres = {'执行指令': fake_inst, '结果': fake_resul} results.append(oneres) #结果入队列后,指令就不能回退 - #一条指令执行完成 - self.doing_instr_list[th_index] = "" + + #节点执行完成后置空 + self.doing_instr_list[th_index] = "" #指令都执行结束后,入节点待提交队列 str_res = json.dumps(results, ensure_ascii=False) # 提交llm待处理任务 --更新节点work_status - if work_node.do_sn >= myCongif.get_data("max_do_sn"): # 该节点执行指令已超过10条 - llm_type = 10 self.put_node_reslist(work_node, str_res, llm_type) # 保存记录 g_PKM.WriteData(self.attack_tree,str(self.task_id)) - - except queue.Empty: + else: if bnode_work: bnode_work = False self.no_work_to_do() #判断是否需要把当前任务的无工作状态推送到前端 @@ -187,10 +228,12 @@ class TaskObject: th_DBM.connect() th_index = index bnode_work = False + max_llm_sn = myCongif.get_data("max_llm_sn") while self.brun: if self.task_status == 1: - try: - llm_node = self.llm_node_queue.get(block=False) #获取一个待处理节点 + self.get_llm_node(th_index) + if self.llmth_node_list[th_index]: + llm_node = self.llmth_node_list[th_index] #开始处理 bnode_work = True tmp_commands = [] @@ -208,6 +251,13 @@ class TaskObject: break llm_type = llm_data["llm_type"] str_res = llm_data["result"] + #判断执行次数 + llm_node.llm_sn += 1 + if llm_node.llm_sn == max_llm_sn: #提交次数达到上限后,提示LLM结束该节点任务 + llm_type = 10 + #该节点的剩余任务不执行,若有--暂定 + llm_node.clear_res() + #获取提示词 prompt = self.get_llm_prompt(llm_type,str_res,user_Prompt) fake_prompt = self.DataFilter.filter_prompt(prompt) #目标脱敏 @@ -221,7 +271,6 @@ class TaskObject: continue #丢弃 --若需要再次尝试,把llm_data再入队列 # LLM记录存数据库 if th_DBM.ok: - llm_node.llm_sn += 1 bres = th_DBM.insert_llm(self.task_id, prompt, reasoning_content, content, post_time, llm_node.llm_sn,llm_node.path) if not bres: self.logger.error(f"{llm_node.name}-llm入库失败!") @@ -236,21 +285,21 @@ class TaskObject: bok, new_commands,iadd_node = self.tree_manager(node_cmds, llm_node, commands, th_DBM) # 分析指令入对应节点 if bok: # 节点指令若存在错误,测试指令都不处理,需要LLM重新生成 - #tmp_commands.extend(new_commands) - # 测试指令入节点待处理队列 --同时修改节点的work_status - self.put_node_instrlist(new_commands, llm_node, iadd_node) - - self.doing_llm_list[th_index] = "" + tmp_commands.extend(new_commands) + #节点的待提交任务都完成后,统一处理指令 + self.put_node_instrlist(tmp_commands, llm_node) #一个节点完成,节点树持久化---待验证是否有局部更新持久化的方案 g_PKM.WriteData(self.attack_tree,str(self.task_id)) - #推送前端刷新数据 - if self.taskM.web_cur_task == self.task_id: # 如果新增了节点,且该节点树是当前查看的数据,需要通知前端更新数据 + #推送前端刷新数据--执行一个节点就刷新一次 + if self.taskM.web_cur_task == self.task_id: idatatype = 2 strdata = "update accack_tree!" asyncio.run(g_WSM.send_data(idatatype, strdata)) # 先取消当前task,已经通知前端重新获取,这样可以避免后端不必要的数据推送 #self.taskM.web_cur_task = 0 - except queue.Empty: + # 一个节点执行完成后再置空 + self.doing_llm_list[th_index] = "" + else: if bnode_work: bnode_work = False self.no_work_to_do() # 判断是否需要把当前任务的无工作状态推送到前端 @@ -266,30 +315,36 @@ class TaskObject: self.is_had_work = True return bsuccess - def no_work_to_do(self): #任务单步状态控制-- 非工作中--2025-5-7增加了轮次,需要全面验证执行逻辑的完整和正确性 - # 待执行instr-node - instr_node_list = list(self.instr_node_queue.queue) # 待执行指令的node--线程不安全 - llm_node_list = list(self.llm_node_queue.queue) # 待提交llm的node--线程不安全 - if len(instr_node_list) == 0 and len(llm_node_list) == 0: #没有待办节点了 - #遍历在执行 - for str_instr in self.doing_instr_list: - if str_instr != "": - return - for str_llm in self.doing_llm_list: - if str_llm != "": - return + def is_work_doing(self): + '''检查是否还有在执行的事项''' + #获取队列快照 + self.rwlock.acquire_write() + try: + instr_node_list = list(self.instr_node_queue.queue) # 待执行指令的node + llm_node_list = list(self.llm_node_queue.queue) # 待提交llm的node + wth_n_list = self.workth_node_list.copy() + lth_n_list = self.llmth_node_list.copy() + finally: + self.rwlock.release_write() + #只针对这个快照数据进行判断 + #先判断正在执行节点是否有值 --- + for do_node in wth_n_list: + if do_node: + return True + for do_node in lth_n_list: + if do_node: + return True + #再判断待办节点 + if len(instr_node_list) == 0 and len(llm_node_list) == 0: + return False + return True + #是否需要线程锁-待定 + def no_work_to_do(self): #任务单步状态控制-- 非工作中--2025-5-7增加了轮次,需要全面验证执行逻辑的完整和正确性 + #是否还有在执行事项 + bworking = self.is_work_doing() + if not bworking: # 没有在执行任务了 - self.one_step_num -= 1 - if not self.one_step_num: # 执行轮次已结束 - pass - else:#发起新的一轮 - bok = self.put_one_task() - if not bok: - self.one_step_num = 0 - #结束轮次--推送前端 - - #推送是否有工作任务的状态到前端, with self.is_had_work_lock: if self.is_had_work: #如果已经是False那就不需要修改了 @@ -306,6 +361,7 @@ class TaskObject: icount = 0 while self.brun: try: + bworking = False cur_time = get_local_timestr() print(f"----------{self.task_id}-当前时间程序运行情况:{cur_time}") #执行中instr-node @@ -315,6 +371,7 @@ class TaskObject: print(f"Work线程-{index}已处于异常状态,需要重新创建一个工作线程") else: if self.doing_instr_list[index]: + bworking =True print(f"Work线程-{index}-在执行指令:{self.doing_instr_list[index]}") index += 1 @@ -324,8 +381,15 @@ class TaskObject: print(f"LLM线程-{index}已处于异常状态,需要重新创建一个LLM线程") else: if self.doing_llm_list[index]: + bworking = True print(f"LLM线程-{index}-在执行指令:{self.doing_llm_list[index]}") index += 1 + + #判断任务是否完成 + bover = self.is_over() + if bover and not bworking: + self.stop_task() #自己停自己--确保没有阻塞操作 + break #退出循环--结束线程 #处理点修复操作 icount +=1 if icount == 5: @@ -340,12 +404,12 @@ class TaskObject: #------------入两个nodeMQ-禁止直接调用入队列----------- def put_instr_mq(self,node): #这里不做状态的判断,调用前处理 - self.instr_node_queue.put(node) + self.put_instr_node(node) self.update_node_work_status(node,2) #在执行--1.work_status不影响整个任务的执行,错了问题不大,2--attack_tree持久化需要出去lock信息。 def put_llm_mq(self,node): #同instr_mq - self.llm_node_queue.put(node) + self.put_llm_node(node) self.update_node_work_status(node,4) #提交中 async def put_instr_mq_async(self,node): @@ -376,8 +440,11 @@ class TaskObject: node.add_res(one_llm) # 入节点结果队列 self.update_node_work_status(node,3) #待提交llm # 如果是自动执行的模式则入队列交给llm线程处理 - if self.app_work_type == 1: - if self.work_type == 1 and node.bwork: + if node.bwork: #节点是工作状态 + if self.work_type == 1: #自动模式 + self.put_llm_mq(node) #变4 + elif self.work_type == 0 and node.step_num > 0: #人工模式,但是单步步次还没有执行完成 + node.step_num -= 1 self.put_llm_mq(node) #变4 #递归找节点 @@ -399,7 +466,7 @@ class TaskObject: command = command.replace("& &","&&") return command - def put_node_instrlist(self, commands, node,iadd_node): #如果当前节点没有进一般指令返回,需要修改节点执行状态 + def put_node_instrlist(self, commands, node): #如果当前节点没有进一般指令返回,需要修改节点执行状态 if not node: return node_list = [] #有待办指令的节点 @@ -413,12 +480,13 @@ class TaskObject: instruction = re.sub(r'\[.*?\]', "", command, count=1, flags=re.DOTALL) #'''强制约束,不是本节点或者是子节点的指令不处理''' find_node = None - if node_name == node.name: + if node_name == node.name: #指令是当前节点的 find_node = node else: for child_node in node.children: #暂时只找一层 if node_name == child_node.name: - find_node = child_node + if child_node.do_sn == 0: #只有没执行过指令的子节点,才允许添加指令 2025-5-9 新增限制 避免父节点和子节点同时有指令执行,提交llm后,父节点返回子节点指令。 + find_node = child_node break # find_node = self.attack_tree.find_node_by_nodepath_parent(node_path,node,iadd_node,commands) # if not find_node:#对于没有基于节点路径找到对应节点--增加通过节点名称匹配的机制 2025-4-13日添加 @@ -444,59 +512,64 @@ class TaskObject: if node not in node_list: #修改该节点状态为0--无待执行任务 self.update_node_work_status(node,0) - #入instr队列 - if self.app_work_type == 1: - if self.work_type == 1: #是自动执行模式 - for node in node_list: - if node.bwork: - self.put_instr_mq(node) #2-执行中 + #判断是否入instr待办队列---放循环外面是等指令都添加完成后提交待办 + for node in node_list: + if node.bwork: + if self.work_type == 1: #是自动执行模式 + self.put_instr_mq(node) #2-执行中 + elif self.work_type == 0 and node.step_num > 0: #人工模式 且剩余步次大于0 + node.step_num -= 1 + self.put_instr_mq(node) #2-执行中 def put_work_node(self): - '''遍历节点需要处理的任务,提交mq''' + '''遍历节点需要处理的任务,提交mq,load_task-在自动模式下-触发--线程安全''' nodes = self.attack_tree.traverse_bfs() for node in nodes: - if not node.is_instr_empty(): #待执行指令有值 - if not node.is_llm_empty(): - self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!") + if not node.bwork: + continue + node_work_status = node.get_work_status() + if node_work_status in (1,2): #待执行指令 + if node.is_instr_empty():#说明数据有问题了,放弃掉 + node.update_work_status(-1) #置0 -1作为额外的条件参数 else: - if node.bwork: - self.put_instr_mq(node) #提交执行 - elif not node.is_llm_empty(): #待提交llm有值 - if not node.is_instr_empty(): - self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!") + self.put_instr_node(node) #1,2都提交执行 + node.update_work_status(-2)# 置2 + #llm-list不处理,正常应该为空 + elif node_work_status in (3,4): + if node.is_llm_empty():#数据有问题,放弃掉 + node.update_work_status(-1) else: - if node.bwork: - self.put_llm_mq(node) #提交执行 + self.put_llm_node(node) + node.update_work_status(-3) #置4 + else: + pass - #web端提交单步任务--节点单步 - async def put_one_node(self,node): + #web端提交单步任务--节点单步--start + async def put_one_node(self,node,step_num=1): #提交某个节点的代表任务 if self.task_status ==1 and self.work_type==0 and node.bwork: - # node_status = node.get_work_status() - # if node_status == 2 or node_status == 4: - # return False,"当前节点正在执行任务,请稍后点击单步!" - if not node.is_instr_empty(): #待执行指令有值 - if not node.is_llm_empty(): - self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!") - return False,"该节点的待执行任务数据不正确,请联系管理员!" - else: - if node.bwork: - await self.put_instr_mq_async(node) #提交执行 - elif not node.is_llm_empty(): #待提交llm有值 - if not node.is_instr_empty(): - self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!") - return False, "该节点的待执行任务数据不正确,请联系管理员!" + iwork_status = node.get_work_status() + if iwork_status in (1,3): + node.step_num = step_num - 1 #单步步次赋值 -1 规则提交时-1,执行结束连续判断再提交 + + if iwork_status == 1: + self.put_instr_mq(node) else: - if node.bwork: - await self.put_llm_mq_async(node) #提交执行 + self.put_llm_mq(node) + return True,"已提交单步任务" else: - await self.update_node_work_status_async(node,0) #只是修补措施,保障状态的一致性 - return False,"当前节点没有待执行任务!" - return True,"已提交单步任务" + error = "" + if iwork_status == 0: + error = "该节点没有待办任务" + elif iwork_status in (2,4): + error = "该节点已经在执行事项中" + else: + self.logger.error("noed的工作状态值存在问题,需要人工介入!!") + return False,error else: return False,"当前的任务或节点状态不允许执行单步,请检查!" - #web端提交任务单步--任务单步 + #web端提交任务单步--任务单步---2025-5-8-增加约束,只允许单步启动时调用 async def put_one_task(self,step_num): if self.task_status == 1 and self.work_type == 0: bsuccess = self.had_work_to_do() @@ -504,12 +577,14 @@ class TaskObject: nodes = self.attack_tree.traverse_bfs() b_putwork = False for node in nodes: - bput,_ = await self.put_one_node(node) + bput,_ = await self.put_one_node(node,step_num) #错误信息有丢失 if bput: b_putwork = True if b_putwork: return True,"已提交单步任务" - else: + else: #所有节点都没有提交任务 + #可以尝试stop下 + self.stop_task() return False,"该任务已经没有待提交任务" else: return False,"当前任务正在执行任务中,不需要提交单步任务!" @@ -519,9 +594,9 @@ class TaskObject: #修改节点的执行状态,并需要基于websocket推送到前端显示 同步线程调用 def update_node_work_status(self,node,work_status): #更新状态 - bchange = node.update_work_status(work_status) + bchange = node.update_work_status(work_status) #1,3会返回Flase #基于websocket推送到前端 - if bchange and work_status != 1: #llm执行完成后会发送单独的指令更新树,所以不发送1更新节点了 + if work_status != 1: #llm执行完成后会发送单独的指令更新树,所以不发送1更新节点了 #判断是否是web端最新获取数据的task if self.taskM.web_cur_task == self.task_id: idatatype = 1 @@ -594,6 +669,7 @@ class TaskObject: bok,strerror = g_CV.verify_node_cmds(node_cmds) if not bok: #节点指令存在问题,则不进行后续处理,提交一个错误反馈任务 # 提交llm待处理任务 + node.step_num += 1 #单步次数还原一个--暂时只针对有效的返回才算一次 self.put_node_reslist(node, strerror, 2) return False,commands,0 #message_调整传递时机后,可以先执行添加节点 @@ -767,7 +843,7 @@ class TaskObject: # 插入一个user消息 # 提交第一个llm任务,开始工作 know_info = f"本测试主机的IP地址为:{self.local_ip}" - self.put_node_reslist(root_node, know_info, 0) # 入待提交list + self.put_node_reslist(root_node, know_info, 0) # 入待提交list,若是人工模式则不入待办MQ # 初始保存个attack_tree文件 g_PKM.WriteData(self.attack_tree, str(self.task_id)) @@ -812,6 +888,8 @@ class TaskObject: :return bool str ''' if self.is_task_stop(): + if self.is_over(): #只是判断了待办list,在执行的会漏判断 + return False, "该任务所有未暂停节点的待办事项都已经结束" #更新状态标识 self.update_task_status(1) self.brun = True #线程正常启动 @@ -835,14 +913,31 @@ class TaskObject: return False,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!" def stop_task(self): #还未处理 - if self.brun and self.task_status ==1: #不是正常工作状态就不执行停止工作了 + if self.brun and self.task_status == 1: #不是正常工作状态就不执行停止工作了 #停止子进程池 self.PythonM.shutdown_pool() #停止线程 self.brun = False - self.update_task_status(0) + self.update_task_status(0) #置状态0 # 结束任务需要收尾处理#? self.InstrM.init_data() #pass + def is_over(self): + ''' + 判断当前任务是否所有节点都已结束 + :return: False-非暂停节点有代表事项,True-非粘贴节点事项都已完成 + ''' + b_over = True + nodes = self.attack_tree.traverse_bfs() + for node in nodes: + if node.bwork: + work_status = node.get_work_status() + if work_status != 0: + b_over = False # 有一个有任务就不继续进行判断 + break + return b_over + + + if __name__ == "__main__": pass \ No newline at end of file diff --git a/myutils/ReadWriteLock.py b/myutils/ReadWriteLock.py new file mode 100644 index 0000000..a332fe0 --- /dev/null +++ b/myutils/ReadWriteLock.py @@ -0,0 +1,36 @@ +import threading + +class ReadWriteLock: + def __init__(self): + self._rw_lock = threading.Lock() + self._readers = 0 + self._writers_waiting = 0 + self._writer = False + self._cond = threading.Condition(self._rw_lock) + + def acquire_read(self): + with self._cond: + # 如果已有写锁,或有写者在等待,都要等 + while self._writer or self._writers_waiting > 0: + self._cond.wait() + self._readers += 1 + + def release_read(self): + with self._cond: + self._readers -= 1 + if self._readers == 0: + self._cond.notify_all() + + def acquire_write(self): + with self._cond: + self._writers_waiting += 1 + # 等到没人读、没人写 + while self._writer or self._readers > 0: + self._cond.wait() + self._writers_waiting -= 1 + self._writer = True + + def release_write(self): + with self._cond: + self._writer = False + self._cond.notify_all() \ No newline at end of file diff --git a/web/API/task.py b/web/API/task.py index ecb0613..46c2d5f 100644 --- a/web/API/task.py +++ b/web/API/task.py @@ -141,7 +141,7 @@ async def task_one_step(): if not task_id: return jsonify({'error': 'Missing task_id'}), 400 if not step_num: - step_num = 1 #默认一轮 + step_num = 1 #默认一步 bsuccess,error = await g_TaskM.task_one_step(task_id,step_num) return jsonify({"bsuccess":bsuccess,"error":error}) diff --git a/web/main/templates/index.html b/web/main/templates/index.html index db98d9d..42cb859 100644 --- a/web/main/templates/index.html +++ b/web/main/templates/index.html @@ -83,7 +83,7 @@