Browse Source

V0.5.5

1.instr_do_max的限制调整为llm_do_max
2.在执行指令和在提交prompt的清空都放到了当前节点完成后清空;
3.新增了批量步次的功能,步次在单步的时候传递赋值;(但验证)
4.明确了节点完成事项的状态,同暂停状态,但启动时会验证未暂停节点是否有待办事项,没有则不启动。
5.在th_check线程里增加了对任务事项完成状态的判断;
6.node增加了lock,把work_status和节点待处理数据进行整合,都封装在节点内操作-包括节点状态的修改;
7.反序列化对应的增加了移出锁和增加锁的处理(待验证);
8.增加现在,当父节点给子节点增加指令时,只允许针对未执行指令的子节点。
master
张龙 4 weeks ago
parent
commit
d93cf60341
  1. 1
      config.yaml
  2. 211
      mycode/AttackMap.py
  3. 33
      mycode/TaskManager.py
  4. 289
      mycode/TaskObject.py
  5. 36
      myutils/ReadWriteLock.py
  6. 2
      web/API/task.py
  7. 2
      web/main/templates/index.html
  8. 2
      web/main/templates/task_manager.html

1
config.yaml

@ -24,6 +24,7 @@ LLM_max_chain_count: 10 #为了避免推理链过长,造成推理效果变差
#Node #Node
max_do_sn: 15 #同一节点最多执行5次指令 max_do_sn: 15 #同一节点最多执行5次指令
max_llm_sn: 5 #同一节点最多llm的提交次数
#用户初始密码 #用户初始密码
pw: zfkj_123!@# pw: zfkj_123!@#

211
mycode/AttackMap.py

@ -173,11 +173,10 @@ class TreeNode:
def __init__(self, name,task_id,status="未完成", vul_type="未发现"): def __init__(self, name,task_id,status="未完成", vul_type="未发现"):
self.task_id = task_id #任务id self.task_id = task_id #任务id
self.name = name # 节点名称 self.name = name # 节点名称
#self.node_lock = threading.Lock() #线程锁
self.bwork = True # 当前节点是否工作,默认True --停止/启动 self.bwork = True # 当前节点是否工作,默认True --停止/启动
self.status = status # 节点测试状态 -- 由llm返回指令触发更新 self.status = status # 节点测试状态 -- 由llm返回指令触发更新
#work_status需要跟两个list统一管理:初始0,入instr_queue为1,入instr_node_mq为2,入res_queue为3,入llm_node_mq为4,llm处理完0或1 #work_status需要跟两个list统一管理:初始0,入instr_queue为1,入instr_node_mq为2,入res_queue为3,入llm_node_mq为4,llm处理完0或1
self._work_status = 0 #0-无任务,1-待执行测试指令,2-执行指令中,3-待提交Llm,4-提交llm中, 2025-4-6新增,用来动态显示节点的工作细节。 self._work_status = -1 #0-无任务,1-待执行测试指令,2-执行指令中,3-待提交Llm,4-提交llm中, 2025-4-6新增,用来动态显示节点的工作细节。
#self.work_status_lock = threading.Lock() ---节点不能有锁 #self.work_status_lock = threading.Lock() ---节点不能有锁
self.vul_type = vul_type # 漏洞类型--目前赋值时没拆json self.vul_type = vul_type # 漏洞类型--目前赋值时没拆json
self.vul_name = "" self.vul_name = ""
@ -202,12 +201,34 @@ class TreeNode:
#用户补充信息 #用户补充信息
self.cookie = "" self.cookie = ""
self.ext_info = "" self.ext_info = ""
#线程锁-- 2025-5-9 两个list 合并在一起管理,只会有一个List 有值
self.work_status_lock = threading.Lock()
def __getstate__(self):
state = self.__dict__.copy()
for attr in ('work_status_lock'):
state.pop(attr, None)
return state
def __setstate__(self, state):
# 恢复其余字段
self.__dict__.update(state)
# 重建运行时用的锁
self.work_status_lock = threading.Lock()
#设置用户信息 #设置用户信息
def set_user_info(self,cookie,ext_info): def set_user_info(self,cookie,ext_info):
self.cookie = cookie self.cookie = cookie
self.ext_info = ext_info self.ext_info = ext_info
#添加子节点
def add_child(self, child_node):
child_node.parent = self
child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值
child_node.step_num = self.step_num
self.children.append(child_node)
#---------------------messages处理--------------------
def copy_messages(self,p_msg,c_msg): #2025-4-23修改用做给本节点加msg def copy_messages(self,p_msg,c_msg): #2025-4-23修改用做给本节点加msg
''' '''
当前节点添加mesg,约束p_msg除system只取两层c_msg:只取最后两个 当前节点添加mesg,约束p_msg除system只取两层c_msg:只取最后两个
@ -216,7 +237,7 @@ class TreeNode:
:return: :return:
''' '''
if not p_msg or not c_msg: if not p_msg or not c_msg:
print("Messages存储存在问题!需要立即检查逻辑!") print("Messages存储存在问题!需要检查逻辑!")
return return
tmp_pmsg = copy.deepcopy(p_msg) tmp_pmsg = copy.deepcopy(p_msg)
tmp_cmsg = copy.deepcopy(c_msg) tmp_cmsg = copy.deepcopy(c_msg)
@ -248,91 +269,143 @@ class TreeNode:
for msg in tmp_cmsg[isart:]: for msg in tmp_cmsg[isart:]:
self.parent_messages.append(msg) self.parent_messages.append(msg)
#添加子节点 def updatemsg(self,newtype,newcontent,p_msg,c_msg,index=0): #index待处理,目前待提交状态时,只应该有一条待提交数据
def add_child(self, child_node): with self.work_status_lock:
child_node.parent = self if self._work_status == 0: #新增指令
child_node.path = self.path + f"->{child_node.name}" #子节点的路径赋值 if not self._llm_quere:
self.children.append(child_node) #判断是否要copy-父节点msg
if not self.parent_messages:
self.copy_messages(p_msg,c_msg)
newmsg = {"llm_type": int(newtype), "result": newcontent}
self._llm_quere.append(newmsg)
# 更新节点状态
self._work_status = 3 # 待提交
else:
return False,"新增指令,待提交数据不应该有数据"
elif self._work_status == 3: #只允许待提交状态修改msg
if self._llm_quere:
oldmsg = self._llm_quere[0]
oldmsg_llm_type = oldmsg["llm_type"] # llm_type不允许修改
newmsg = {"llm_type": int(oldmsg_llm_type), "result": newcontent}
self._llm_quere[0] = newmsg
else:
return False,"状态是待提交,不应该没有待提交数据"
else:
return False,"该状态,不运行修改待提交数据"
return True,""
def is_instr_empty(self):#待改 --根据work_status判断
with self.work_status_lock:
if self._instr_queue:
return False
return True
def is_llm_empty(self):#待改 --根据work_status判断
with self.work_status_lock:
if self._llm_quere:
return False
return True
#修改节点的执行状态--return bchange #修改节点的执行状态--return bchange 只能是2-4
def update_work_status(self,work_status): def update_work_status(self,work_status):
bsuccess = False bsuccess = True
if self._work_status != work_status: with self.work_status_lock:
self._work_status = work_status if self._work_status == 1 and work_status == 2: #只允许从1-》2
bsuccess = True self._work_status = 2
elif self._work_status == 3 and work_status == 4:#只允许从3-》4
self._work_status = 4
elif self._work_status ==4 and work_status == 0: #4->0
self._work_status = 0
elif work_status == -1:
self._work_status = 0
elif work_status == -2:
self._work_status = 2
elif work_status == -3:
self._work_status = 4
else:
bsuccess = False
return bsuccess return bsuccess
def get_work_status(self): def get_work_status(self):
#加锁有没有意义---web端和本身的工作线程会有同步问题,但与持久化相比,暂时忽略 #加锁有没有意义---web端和本身的工作线程会有同步问题
work_status = self._work_status work_status = self._work_status
return work_status return work_status
#-------后期扩充逻辑,目前wokr_status的修改交给上层类对象------- def add_instr(self,instr,p_msg,c_msg): #所有指令一次提交
def add_instr(self,instr,p_msg,c_msg): if instr:
if not self.parent_messages: #为空时赋值 with self.work_status_lock:
self.copy_messages(p_msg,c_msg) if not self.parent_messages: #为空时赋值
self._instr_queue.append(instr) self.copy_messages(p_msg,c_msg)
if self._work_status in (-1,1,4):
def test_add_instr(self,instr): self._instr_queue.append(instr)
self._instr_queue.append(instr) self._work_status = 1 #待执行
self._llm_quere = [] return True
else:
def get_instr(self): print("插入指令时,状态不为-1,1,4!")
return self._instr_queue.pop(0) if self._instr_queue else None return False,"节点的工作状态不是0或4,请检查程序逻辑"
def get_instr_user(self):
return self._instr_queue
def del_instr(self,instr):
if instr in self._instr_queue:
self._instr_queue.remove(instr)
#指令删除后要判断是否清空指令了
if not self._instr_queue:
self._work_status = 0 #状态调整为没有带执行指令
return True,""
else: else:
return False,"该指令不在队列中!" return False,"指令数据为空"
def get_instr(self):
with self.work_status_lock:
if self._work_status == 2: #执行中
return self._instr_queue.pop(0) if self._instr_queue else None
else:
print("不是执行中,不应该来取指令!")
return None
def add_res(self,str_res): #结构化结果字串 def del_instr(self,instr): #web端,手动删除指令
self._llm_quere.append(str_res) with self.work_status_lock:
if self._work_status == 1:
if instr in self._instr_queue:
self._instr_queue.remove(instr)
#指令删除后要判断是否清空指令了
if not self._instr_queue:
self._work_status = 0 #状态调整为无待执行任务
return True,""
else:
return False,"该指令不在队列中!"
else:
return False,"只有待执行时,允许删除指令"
def add_res(self,str_res): #llm_queue入库的情况比较多,2,0,4
if str_res:
with self.work_status_lock:
if self._work_status in (2,0,4):
self._llm_quere.append(str_res)
if self._work_status in (2,0): #提交中,不要改变执行状态
self._work_status =3
else:
print("添加llm数据时,状态不是0,2,4中的一种情况")
return False,"添加llm数据时,状态不是0,2,4中的一种情况"
else:
return False,"待提交llm的数据为空"
def get_res(self): def get_res(self):
return self._llm_quere.pop(0) if self._llm_quere else None with self.work_status_lock:
if self._work_status ==4: #提交中
def get_res_user(self): return self._llm_quere.pop(0) if self._llm_quere else None
return self._llm_quere else:
print("不是提交中,不应该来取待提交数据!")
return None
def get_work_status(self): def clear_res(self):
return self._work_status with self.llm_list_lock:
self._llm_quere.clear()
def updatemsg(self,newtype,newcontent,index):
if self._llm_quere:#
oldmsg_llm_type = self._llm_quere[0]["llm_type"] #llm_type不修改,还未验证
newmsg = {"llm_type": int(oldmsg_llm_type), "result": newcontent}
self._llm_quere[0] = newmsg
else:#新增消息
newmsg = {"llm_type": int(newtype), "result": newcontent}
self._llm_quere.append(newmsg)
#更新节点状态
self._work_status = 3 #待提交
return True,""
def is_instr_empty(self): #-----------web查看数据-----------
if self._instr_queue: def get_instr_user(self): #读不用锁了 -有错误问题不大
return False with self.work_status_lock:
return True instr_que = self._instr_queue.copy()
return instr_que
def is_llm_empty(self): def get_res_user(self): #读不用锁了 -- 有错误问题不大
if self._llm_quere: with self.work_status_lock:
return False llm_que = self._llm_quere.copy()
return True return llm_que
def __repr__(self): def __repr__(self):
return f"TreeNode({self.name}, {self.status}, {self.vul_type})" return f"TreeNode({self.name}, {self.status}, {self.vul_type})"
if __name__ == "__main__": if __name__ == "__main__":
pass pass

33
mycode/TaskManager.py

@ -31,7 +31,9 @@ class TaskManager:
#程序启动后,加载未完成的测试任务 #程序启动后,加载未完成的测试任务
def load_tasks(self): def load_tasks(self):
'''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>2''' '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>2
#若不是异常停止,正常情况下,任务都应该是没有待办MQ的
'''
datas = app_DBM.get_run_tasks() datas = app_DBM.get_run_tasks()
for data in datas: for data in datas:
task_id = data[0] task_id = data[0]
@ -51,15 +53,15 @@ class TaskManager:
task.init_task(task_id,attack_tree) task.init_task(task_id,attack_tree)
#开始任务 ---根据task_status来判断是否需要启动工作线程 #开始任务 ---根据task_status来判断是否需要启动工作线程
if task_status == 1: if task_status == 1:
if self.cur_task_run_num < self.max_run_task: #load 是程序刚起,只有主线程,不加锁 if self.cur_task_run_num < self.max_run_task: #load 是程序刚起,只有主线程,不加锁
bsuc,strout = task.start_task() bsuc,strout = task.start_task()
if bsuc: if bsuc:
self.cur_task_run_num +=1 self.cur_task_run_num +=1
else:
task.update_task_status(0)
else: else:
task.update_task_status(0) self.logger.error("重载未结束任务,不应该超过最大运行数量的task_status为启动状态")
else: task.update_task_status(0)#尝试硬恢复
self.logger.error("重载未结束任务,不应该超过最大运行数量的task_status为启动状态")
task.update_task_status(0)#尝试硬恢复
# 内存保留task对象 # 内存保留task对象
self.tasks[task_id] = task self.tasks[task_id] = task
else: else:
@ -90,7 +92,7 @@ class TaskManager:
#获取task_id -- test_target,cookie_info,work_type,llm_type 入数据库 #获取task_id -- test_target,cookie_info,work_type,llm_type 入数据库
task_id = app_DBM.start_task(target,"",work_type,llm_type,fake_target) task_id = app_DBM.start_task(target,"",work_type,llm_type,fake_target)
if task_id >0: if task_id >0:
#2025-4-28调整批量添加任务,默认不启动 #2025-4-28调整批量添加任务,默认不启动线程start_task
task.init_task(task_id) task.init_task(task_id)
#保留task对象 #保留task对象
self.tasks[task_id] = task self.tasks[task_id] = task
@ -256,15 +258,8 @@ class TaskManager:
if task: if task:
node = task.attack_tree.find_node_by_nodepath(nodepath) node = task.attack_tree.find_node_by_nodepath(nodepath)
if node: if node:
work_status = node.get_work_status() bsuccess,error = node.updatemsg(newtype,newcontent,node.parent.parent_messages,node.parent.cur_messages,0)
if work_status == 0 or work_status == 3: return bsuccess,error
if work_status == 0:
if not node.parent_messages: #如果messages为空--且不会是根节点
node.copy_messages(node.parent.parent_messages,node.parent.cur_messages)
bsuccess,error = node.updatemsg(newtype,newcontent,0) #取的第一条,也就修改第一条
return bsuccess,error
else:
return False,"当前节点的工作状态不允许修改MSG!"
return False,"找不到对应节点!" return False,"找不到对应节点!"
def del_node_instr(self,task_id,nodepath,instr): def del_node_instr(self,task_id,nodepath,instr):

289
mycode/TaskObject.py

@ -15,6 +15,7 @@ from mycode.WebSocketManager import g_WSM
from mycode.CommandVerify import g_CV from mycode.CommandVerify import g_CV
from mycode.PythonTManager import PythonTManager from mycode.PythonTManager import PythonTManager
from mycode.DataFilterManager import DataFilterManager from mycode.DataFilterManager import DataFilterManager
from myutils.ReadWriteLock import ReadWriteLock
import asyncio import asyncio
import queue import queue
import time import time
@ -46,31 +47,72 @@ class TaskObject:
self.task_status = 0 #0-暂停,1-执行中,2-已完成 3-未启动,2025-4-27为配合批量添加任务增加“未启动”状态 self.task_status = 0 #0-暂停,1-执行中,2-已完成 3-未启动,2025-4-27为配合批量添加任务增加“未启动”状态
self.local_ip = local_ip self.local_ip = local_ip
self.attack_tree = None #任务节点树 self.attack_tree = None #任务节点树
self.attack_tree_lock = None #把节点的Lock外置
self.safe_rank = safe_rank #安全级别 0-9 #?暂时还没实现更新逻辑 self.safe_rank = safe_rank #安全级别 0-9 #?暂时还没实现更新逻辑
self.is_had_work = False self.is_had_work = False
self.is_had_work_lock = threading.Lock() self.is_had_work_lock = threading.Lock()
#读写锁
self.rwlock = ReadWriteLock()
#指令执行相关------- #指令执行相关-------
self.max_thread_num = num_threads #指令执行线程数量 self.max_thread_num = num_threads #指令执行线程数量
self.workth_list = [None] * num_threads #线程句柄list self.workth_list = [None] * num_threads #线程句柄list
self.workth_node_list = [None] * num_threads # 线程句柄list
self.doing_instr_list= [""] * num_threads self.doing_instr_list= [""] * num_threads
self.instr_node_queue = queue.Queue() #待执行指令的节点队列 self.instr_node_queue = queue.Queue() #待执行指令的节点队列
self.node_num = 0 #在处理Node线程的处理 self.node_num = 0 #在处理Node线程的处理
#llm执行相关-------- #llm执行相关--------
self.llm_max_nums = myCongif.get_data("LLM_max_threads") # 控制最大并发指令数量 --- 多线程的话节点树需要加锁 self.llm_max_nums = myCongif.get_data("LLM_max_threads") # 控制最大并发指令数量 --- 多线程的话节点树需要加锁
self.llmth_list = [None] * self.llm_max_nums # llm线程list self.llmth_list = [None] * self.llm_max_nums # llm线程list
self.llmth_node_list = [None] * self.llm_max_nums # llm线程--node
self.doing_llm_list = [""] * self.llm_max_nums self.doing_llm_list = [""] * self.llm_max_nums
self.llm_node_queue = queue.Queue() #待提交LLM的节点队列 self.llm_node_queue = queue.Queue() #待提交LLM的节点队列
#自检线程----------- #自检线程-----------
self.check_th = None #自检线程句柄 self.check_th = None #自检线程句柄
#单步执行控制-------
self.one_step_num = 0 #单步执行任务次数,默认1
#-----四队列------- #-----四队列-------
self.run_instr_lock = threading.Lock() # 线程锁 self.run_instr_lock = threading.Lock() # 线程锁
self.runing_instr = {} #执行中指令记录 self.runing_instr = {} #执行中指令记录
#---------------三个线程------------ #---------------三个线程------------
#测试指令执行线程 #测试指令执行线程
#读取待办指令节点
def get_instr_node(self,index):
self.rwlock.acquire_read()
try:
self.workth_node_list[index] = self.instr_node_queue.get(block=False)
except queue.Empty:
self.workth_node_list[index] = None
finally:
self.rwlock.release_read()
def put_instr_node(self,node):
#self.instr_node_queue.put(node)
self.rwlock.acquire_read()
try:
self.instr_node_queue.put(node)
finally:
self.rwlock.release_read()
#读取待办llm节点
def get_llm_node(self,index):
self.rwlock.acquire_read()
try:
self.llmth_node_list[index] = self.llm_node_queue.get(block=False)
except queue.Empty:
self.llmth_node_list[index] = None
finally:
self.rwlock.release_read()
def put_llm_node(self,node):
# self.llm_node_queue.put(node)
self.rwlock.acquire_read()
try:
self.llm_node_queue.put(node)
finally:
self.rwlock.release_read()
def mill_instr_preprocess(self,instructions,str_split): def mill_instr_preprocess(self,instructions,str_split):
new_instr = [] new_instr = []
instrs = instructions.split(str_split) instrs = instructions.split(str_split)
@ -129,13 +171,14 @@ class TaskObject:
bnode_work = False bnode_work = False
while self.brun: while self.brun:
if self.task_status == 1: if self.task_status == 1:
try: self.get_instr_node(th_index) #获取一个待办节点
if self.workth_node_list[th_index]:
work_node = self.workth_node_list[th_index]
llm_type = 1 llm_type = 1
work_node = self.instr_node_queue.get(block=False)#正常一个队列中一个节点只会出现一次,进而也就只有一个线程在处理
# 开始执行指令 # 开始执行指令
bnode_work = True bnode_work = True
results = [] results = []
while True: while True: #遍历节点待执行指令,执行
instruction = work_node.get_instr() instruction = work_node.get_instr()
if not instruction: if not instruction:
break break
@ -155,18 +198,16 @@ class TaskObject:
fake_inst,fake_resul = self.DataFilter.filter_result(instr,reslut) fake_inst,fake_resul = self.DataFilter.filter_result(instr,reslut)
oneres = {'执行指令': fake_inst, '结果': fake_resul} oneres = {'执行指令': fake_inst, '结果': fake_resul}
results.append(oneres) #结果入队列后,指令就不能回退 results.append(oneres) #结果入队列后,指令就不能回退
#一条指令执行完成
self.doing_instr_list[th_index] = "" #节点执行完成后置空
self.doing_instr_list[th_index] = ""
#指令都执行结束后,入节点待提交队列 #指令都执行结束后,入节点待提交队列
str_res = json.dumps(results, ensure_ascii=False) str_res = json.dumps(results, ensure_ascii=False)
# 提交llm待处理任务 --更新节点work_status # 提交llm待处理任务 --更新节点work_status
if work_node.do_sn >= myCongif.get_data("max_do_sn"): # 该节点执行指令已超过10条
llm_type = 10
self.put_node_reslist(work_node, str_res, llm_type) self.put_node_reslist(work_node, str_res, llm_type)
# 保存记录 # 保存记录
g_PKM.WriteData(self.attack_tree,str(self.task_id)) g_PKM.WriteData(self.attack_tree,str(self.task_id))
else:
except queue.Empty:
if bnode_work: if bnode_work:
bnode_work = False bnode_work = False
self.no_work_to_do() #判断是否需要把当前任务的无工作状态推送到前端 self.no_work_to_do() #判断是否需要把当前任务的无工作状态推送到前端
@ -187,10 +228,12 @@ class TaskObject:
th_DBM.connect() th_DBM.connect()
th_index = index th_index = index
bnode_work = False bnode_work = False
max_llm_sn = myCongif.get_data("max_llm_sn")
while self.brun: while self.brun:
if self.task_status == 1: if self.task_status == 1:
try: self.get_llm_node(th_index)
llm_node = self.llm_node_queue.get(block=False) #获取一个待处理节点 if self.llmth_node_list[th_index]:
llm_node = self.llmth_node_list[th_index]
#开始处理 #开始处理
bnode_work = True bnode_work = True
tmp_commands = [] tmp_commands = []
@ -208,6 +251,13 @@ class TaskObject:
break break
llm_type = llm_data["llm_type"] llm_type = llm_data["llm_type"]
str_res = llm_data["result"] str_res = llm_data["result"]
#判断执行次数
llm_node.llm_sn += 1
if llm_node.llm_sn == max_llm_sn: #提交次数达到上限后,提示LLM结束该节点任务
llm_type = 10
#该节点的剩余任务不执行,若有--暂定
llm_node.clear_res()
#获取提示词 #获取提示词
prompt = self.get_llm_prompt(llm_type,str_res,user_Prompt) prompt = self.get_llm_prompt(llm_type,str_res,user_Prompt)
fake_prompt = self.DataFilter.filter_prompt(prompt) #目标脱敏 fake_prompt = self.DataFilter.filter_prompt(prompt) #目标脱敏
@ -221,7 +271,6 @@ class TaskObject:
continue #丢弃 --若需要再次尝试,把llm_data再入队列 continue #丢弃 --若需要再次尝试,把llm_data再入队列
# LLM记录存数据库 # LLM记录存数据库
if th_DBM.ok: if th_DBM.ok:
llm_node.llm_sn += 1
bres = th_DBM.insert_llm(self.task_id, prompt, reasoning_content, content, post_time, llm_node.llm_sn,llm_node.path) bres = th_DBM.insert_llm(self.task_id, prompt, reasoning_content, content, post_time, llm_node.llm_sn,llm_node.path)
if not bres: if not bres:
self.logger.error(f"{llm_node.name}-llm入库失败!") self.logger.error(f"{llm_node.name}-llm入库失败!")
@ -236,21 +285,21 @@ class TaskObject:
bok, new_commands,iadd_node = self.tree_manager(node_cmds, llm_node, commands, th_DBM) bok, new_commands,iadd_node = self.tree_manager(node_cmds, llm_node, commands, th_DBM)
# 分析指令入对应节点 # 分析指令入对应节点
if bok: # 节点指令若存在错误,测试指令都不处理,需要LLM重新生成 if bok: # 节点指令若存在错误,测试指令都不处理,需要LLM重新生成
#tmp_commands.extend(new_commands) tmp_commands.extend(new_commands)
# 测试指令入节点待处理队列 --同时修改节点的work_status #节点的待提交任务都完成后,统一处理指令
self.put_node_instrlist(new_commands, llm_node, iadd_node) self.put_node_instrlist(tmp_commands, llm_node)
self.doing_llm_list[th_index] = ""
#一个节点完成,节点树持久化---待验证是否有局部更新持久化的方案 #一个节点完成,节点树持久化---待验证是否有局部更新持久化的方案
g_PKM.WriteData(self.attack_tree,str(self.task_id)) g_PKM.WriteData(self.attack_tree,str(self.task_id))
#推送前端刷新数据 #推送前端刷新数据--执行一个节点就刷新一次
if self.taskM.web_cur_task == self.task_id: # 如果新增了节点,且该节点树是当前查看的数据,需要通知前端更新数据 if self.taskM.web_cur_task == self.task_id:
idatatype = 2 idatatype = 2
strdata = "update accack_tree!" strdata = "update accack_tree!"
asyncio.run(g_WSM.send_data(idatatype, strdata)) asyncio.run(g_WSM.send_data(idatatype, strdata))
# 先取消当前task,已经通知前端重新获取,这样可以避免后端不必要的数据推送 # 先取消当前task,已经通知前端重新获取,这样可以避免后端不必要的数据推送
#self.taskM.web_cur_task = 0 #self.taskM.web_cur_task = 0
except queue.Empty: # 一个节点执行完成后再置空
self.doing_llm_list[th_index] = ""
else:
if bnode_work: if bnode_work:
bnode_work = False bnode_work = False
self.no_work_to_do() # 判断是否需要把当前任务的无工作状态推送到前端 self.no_work_to_do() # 判断是否需要把当前任务的无工作状态推送到前端
@ -266,30 +315,36 @@ class TaskObject:
self.is_had_work = True self.is_had_work = True
return bsuccess return bsuccess
def no_work_to_do(self): #任务单步状态控制-- 非工作中--2025-5-7增加了轮次,需要全面验证执行逻辑的完整和正确性 def is_work_doing(self):
# 待执行instr-node '''检查是否还有在执行的事项'''
instr_node_list = list(self.instr_node_queue.queue) # 待执行指令的node--线程不安全 #获取队列快照
llm_node_list = list(self.llm_node_queue.queue) # 待提交llm的node--线程不安全 self.rwlock.acquire_write()
if len(instr_node_list) == 0 and len(llm_node_list) == 0: #没有待办节点了 try:
#遍历在执行 instr_node_list = list(self.instr_node_queue.queue) # 待执行指令的node
for str_instr in self.doing_instr_list: llm_node_list = list(self.llm_node_queue.queue) # 待提交llm的node
if str_instr != "": wth_n_list = self.workth_node_list.copy()
return lth_n_list = self.llmth_node_list.copy()
for str_llm in self.doing_llm_list: finally:
if str_llm != "": self.rwlock.release_write()
return #只针对这个快照数据进行判断
#先判断正在执行节点是否有值 ---
for do_node in wth_n_list:
if do_node:
return True
for do_node in lth_n_list:
if do_node:
return True
#再判断待办节点
if len(instr_node_list) == 0 and len(llm_node_list) == 0:
return False
return True
#是否需要线程锁-待定
def no_work_to_do(self): #任务单步状态控制-- 非工作中--2025-5-7增加了轮次,需要全面验证执行逻辑的完整和正确性
#是否还有在执行事项
bworking = self.is_work_doing()
if not bworking:
# 没有在执行任务了 # 没有在执行任务了
self.one_step_num -= 1
if not self.one_step_num: # 执行轮次已结束
pass
else:#发起新的一轮
bok = self.put_one_task()
if not bok:
self.one_step_num = 0
#结束轮次--推送前端
#推送是否有工作任务的状态到前端, #推送是否有工作任务的状态到前端,
with self.is_had_work_lock: with self.is_had_work_lock:
if self.is_had_work: #如果已经是False那就不需要修改了 if self.is_had_work: #如果已经是False那就不需要修改了
@ -306,6 +361,7 @@ class TaskObject:
icount = 0 icount = 0
while self.brun: while self.brun:
try: try:
bworking = False
cur_time = get_local_timestr() cur_time = get_local_timestr()
print(f"----------{self.task_id}-当前时间程序运行情况:{cur_time}") print(f"----------{self.task_id}-当前时间程序运行情况:{cur_time}")
#执行中instr-node #执行中instr-node
@ -315,6 +371,7 @@ class TaskObject:
print(f"Work线程-{index}已处于异常状态,需要重新创建一个工作线程") print(f"Work线程-{index}已处于异常状态,需要重新创建一个工作线程")
else: else:
if self.doing_instr_list[index]: if self.doing_instr_list[index]:
bworking =True
print(f"Work线程-{index}-在执行指令:{self.doing_instr_list[index]}") print(f"Work线程-{index}-在执行指令:{self.doing_instr_list[index]}")
index += 1 index += 1
@ -324,8 +381,15 @@ class TaskObject:
print(f"LLM线程-{index}已处于异常状态,需要重新创建一个LLM线程") print(f"LLM线程-{index}已处于异常状态,需要重新创建一个LLM线程")
else: else:
if self.doing_llm_list[index]: if self.doing_llm_list[index]:
bworking = True
print(f"LLM线程-{index}-在执行指令:{self.doing_llm_list[index]}") print(f"LLM线程-{index}-在执行指令:{self.doing_llm_list[index]}")
index += 1 index += 1
#判断任务是否完成
bover = self.is_over()
if bover and not bworking:
self.stop_task() #自己停自己--确保没有阻塞操作
break #退出循环--结束线程
#处理点修复操作 #处理点修复操作
icount +=1 icount +=1
if icount == 5: if icount == 5:
@ -340,12 +404,12 @@ class TaskObject:
#------------入两个nodeMQ-禁止直接调用入队列----------- #------------入两个nodeMQ-禁止直接调用入队列-----------
def put_instr_mq(self,node): def put_instr_mq(self,node):
#这里不做状态的判断,调用前处理 #这里不做状态的判断,调用前处理
self.instr_node_queue.put(node) self.put_instr_node(node)
self.update_node_work_status(node,2) #在执行--1.work_status不影响整个任务的执行,错了问题不大,2--attack_tree持久化需要出去lock信息。 self.update_node_work_status(node,2) #在执行--1.work_status不影响整个任务的执行,错了问题不大,2--attack_tree持久化需要出去lock信息。
def put_llm_mq(self,node): def put_llm_mq(self,node):
#同instr_mq #同instr_mq
self.llm_node_queue.put(node) self.put_llm_node(node)
self.update_node_work_status(node,4) #提交中 self.update_node_work_status(node,4) #提交中
async def put_instr_mq_async(self,node): async def put_instr_mq_async(self,node):
@ -376,8 +440,11 @@ class TaskObject:
node.add_res(one_llm) # 入节点结果队列 node.add_res(one_llm) # 入节点结果队列
self.update_node_work_status(node,3) #待提交llm self.update_node_work_status(node,3) #待提交llm
# 如果是自动执行的模式则入队列交给llm线程处理 # 如果是自动执行的模式则入队列交给llm线程处理
if self.app_work_type == 1: if node.bwork: #节点是工作状态
if self.work_type == 1 and node.bwork: if self.work_type == 1: #自动模式
self.put_llm_mq(node) #变4
elif self.work_type == 0 and node.step_num > 0: #人工模式,但是单步步次还没有执行完成
node.step_num -= 1
self.put_llm_mq(node) #变4 self.put_llm_mq(node) #变4
#递归找节点 #递归找节点
@ -399,7 +466,7 @@ class TaskObject:
command = command.replace("& &","&&") command = command.replace("& &","&&")
return command return command
def put_node_instrlist(self, commands, node,iadd_node): #如果当前节点没有进一般指令返回,需要修改节点执行状态 def put_node_instrlist(self, commands, node): #如果当前节点没有进一般指令返回,需要修改节点执行状态
if not node: if not node:
return return
node_list = [] #有待办指令的节点 node_list = [] #有待办指令的节点
@ -413,12 +480,13 @@ class TaskObject:
instruction = re.sub(r'\[.*?\]', "", command, count=1, flags=re.DOTALL) instruction = re.sub(r'\[.*?\]', "", command, count=1, flags=re.DOTALL)
#'''强制约束,不是本节点或者是子节点的指令不处理''' #'''强制约束,不是本节点或者是子节点的指令不处理'''
find_node = None find_node = None
if node_name == node.name: if node_name == node.name: #指令是当前节点的
find_node = node find_node = node
else: else:
for child_node in node.children: #暂时只找一层 for child_node in node.children: #暂时只找一层
if node_name == child_node.name: if node_name == child_node.name:
find_node = child_node if child_node.do_sn == 0: #只有没执行过指令的子节点,才允许添加指令 2025-5-9 新增限制 避免父节点和子节点同时有指令执行,提交llm后,父节点返回子节点指令。
find_node = child_node
break break
# find_node = self.attack_tree.find_node_by_nodepath_parent(node_path,node,iadd_node,commands) # find_node = self.attack_tree.find_node_by_nodepath_parent(node_path,node,iadd_node,commands)
# if not find_node:#对于没有基于节点路径找到对应节点--增加通过节点名称匹配的机制 2025-4-13日添加 # if not find_node:#对于没有基于节点路径找到对应节点--增加通过节点名称匹配的机制 2025-4-13日添加
@ -444,59 +512,64 @@ class TaskObject:
if node not in node_list: if node not in node_list:
#修改该节点状态为0--无待执行任务 #修改该节点状态为0--无待执行任务
self.update_node_work_status(node,0) self.update_node_work_status(node,0)
#入instr队列 #判断是否入instr待办队列---放循环外面是等指令都添加完成后提交待办
if self.app_work_type == 1: for node in node_list:
if self.work_type == 1: #是自动执行模式 if node.bwork:
for node in node_list: if self.work_type == 1: #是自动执行模式
if node.bwork: self.put_instr_mq(node) #2-执行中
self.put_instr_mq(node) #2-执行中 elif self.work_type == 0 and node.step_num > 0: #人工模式 且剩余步次大于0
node.step_num -= 1
self.put_instr_mq(node) #2-执行中
def put_work_node(self): def put_work_node(self):
'''遍历节点需要处理的任务,提交mq''' '''遍历节点需要处理的任务,提交mq,load_task-在自动模式下-触发--线程安全'''
nodes = self.attack_tree.traverse_bfs() nodes = self.attack_tree.traverse_bfs()
for node in nodes: for node in nodes:
if not node.is_instr_empty(): #待执行指令有值 if not node.bwork:
if not node.is_llm_empty(): continue
self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!") node_work_status = node.get_work_status()
if node_work_status in (1,2): #待执行指令
if node.is_instr_empty():#说明数据有问题了,放弃掉
node.update_work_status(-1) #置0 -1作为额外的条件参数
else: else:
if node.bwork: self.put_instr_node(node) #1,2都提交执行
self.put_instr_mq(node) #提交执行 node.update_work_status(-2)# 置2
elif not node.is_llm_empty(): #待提交llm有值 #llm-list不处理,正常应该为空
if not node.is_instr_empty(): elif node_work_status in (3,4):
self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!") if node.is_llm_empty():#数据有问题,放弃掉
node.update_work_status(-1)
else: else:
if node.bwork: self.put_llm_node(node)
self.put_llm_mq(node) #提交执行 node.update_work_status(-3) #置4
else:
pass
#web端提交单步任务--节点单步 #web端提交单步任务--节点单步--start
async def put_one_node(self,node): async def put_one_node(self,node,step_num=1):
#提交某个节点的代表任务 #提交某个节点的代表任务
if self.task_status ==1 and self.work_type==0 and node.bwork: if self.task_status ==1 and self.work_type==0 and node.bwork:
# node_status = node.get_work_status() iwork_status = node.get_work_status()
# if node_status == 2 or node_status == 4: if iwork_status in (1,3):
# return False,"当前节点正在执行任务,请稍后点击单步!" node.step_num = step_num - 1 #单步步次赋值 -1 规则提交时-1,执行结束连续判断再提交
if not node.is_instr_empty(): #待执行指令有值
if not node.is_llm_empty(): if iwork_status == 1:
self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!") self.put_instr_mq(node)
return False,"该节点的待执行任务数据不正确,请联系管理员!"
else:
if node.bwork:
await self.put_instr_mq_async(node) #提交执行
elif not node.is_llm_empty(): #待提交llm有值
if not node.is_instr_empty():
self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!")
return False, "该节点的待执行任务数据不正确,请联系管理员!"
else: else:
if node.bwork: self.put_llm_mq(node)
await self.put_llm_mq_async(node) #提交执行 return True,"已提交单步任务"
else: else:
await self.update_node_work_status_async(node,0) #只是修补措施,保障状态的一致性 error = ""
return False,"当前节点没有待执行任务!" if iwork_status == 0:
return True,"已提交单步任务" error = "该节点没有待办任务"
elif iwork_status in (2,4):
error = "该节点已经在执行事项中"
else:
self.logger.error("noed的工作状态值存在问题,需要人工介入!!")
return False,error
else: else:
return False,"当前的任务或节点状态不允许执行单步,请检查!" return False,"当前的任务或节点状态不允许执行单步,请检查!"
#web端提交任务单步--任务单步 #web端提交任务单步--任务单步---2025-5-8-增加约束,只允许单步启动时调用
async def put_one_task(self,step_num): async def put_one_task(self,step_num):
if self.task_status == 1 and self.work_type == 0: if self.task_status == 1 and self.work_type == 0:
bsuccess = self.had_work_to_do() bsuccess = self.had_work_to_do()
@ -504,12 +577,14 @@ class TaskObject:
nodes = self.attack_tree.traverse_bfs() nodes = self.attack_tree.traverse_bfs()
b_putwork = False b_putwork = False
for node in nodes: for node in nodes:
bput,_ = await self.put_one_node(node) bput,_ = await self.put_one_node(node,step_num) #错误信息有丢失
if bput: if bput:
b_putwork = True b_putwork = True
if b_putwork: if b_putwork:
return True,"已提交单步任务" return True,"已提交单步任务"
else: else: #所有节点都没有提交任务
#可以尝试stop下
self.stop_task()
return False,"该任务已经没有待提交任务" return False,"该任务已经没有待提交任务"
else: else:
return False,"当前任务正在执行任务中,不需要提交单步任务!" return False,"当前任务正在执行任务中,不需要提交单步任务!"
@ -519,9 +594,9 @@ class TaskObject:
#修改节点的执行状态,并需要基于websocket推送到前端显示 同步线程调用 #修改节点的执行状态,并需要基于websocket推送到前端显示 同步线程调用
def update_node_work_status(self,node,work_status): def update_node_work_status(self,node,work_status):
#更新状态 #更新状态
bchange = node.update_work_status(work_status) bchange = node.update_work_status(work_status) #1,3会返回Flase
#基于websocket推送到前端 #基于websocket推送到前端
if bchange and work_status != 1: #llm执行完成后会发送单独的指令更新树,所以不发送1更新节点了 if work_status != 1: #llm执行完成后会发送单独的指令更新树,所以不发送1更新节点了
#判断是否是web端最新获取数据的task #判断是否是web端最新获取数据的task
if self.taskM.web_cur_task == self.task_id: if self.taskM.web_cur_task == self.task_id:
idatatype = 1 idatatype = 1
@ -594,6 +669,7 @@ class TaskObject:
bok,strerror = g_CV.verify_node_cmds(node_cmds) bok,strerror = g_CV.verify_node_cmds(node_cmds)
if not bok: #节点指令存在问题,则不进行后续处理,提交一个错误反馈任务 if not bok: #节点指令存在问题,则不进行后续处理,提交一个错误反馈任务
# 提交llm待处理任务 # 提交llm待处理任务
node.step_num += 1 #单步次数还原一个--暂时只针对有效的返回才算一次
self.put_node_reslist(node, strerror, 2) self.put_node_reslist(node, strerror, 2)
return False,commands,0 return False,commands,0
#message_调整传递时机后,可以先执行添加节点 #message_调整传递时机后,可以先执行添加节点
@ -767,7 +843,7 @@ class TaskObject:
# 插入一个user消息 # 插入一个user消息
# 提交第一个llm任务,开始工作 # 提交第一个llm任务,开始工作
know_info = f"本测试主机的IP地址为:{self.local_ip}" know_info = f"本测试主机的IP地址为:{self.local_ip}"
self.put_node_reslist(root_node, know_info, 0) # 入待提交list self.put_node_reslist(root_node, know_info, 0) # 入待提交list,若是人工模式则不入待办MQ
# 初始保存个attack_tree文件 # 初始保存个attack_tree文件
g_PKM.WriteData(self.attack_tree, str(self.task_id)) g_PKM.WriteData(self.attack_tree, str(self.task_id))
@ -812,6 +888,8 @@ class TaskObject:
:return bool str :return bool str
''' '''
if self.is_task_stop(): if self.is_task_stop():
if self.is_over(): #只是判断了待办list,在执行的会漏判断
return False, "该任务所有未暂停节点的待办事项都已经结束"
#更新状态标识 #更新状态标识
self.update_task_status(1) self.update_task_status(1)
self.brun = True #线程正常启动 self.brun = True #线程正常启动
@ -835,14 +913,31 @@ class TaskObject:
return False,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!" return False,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!"
def stop_task(self): #还未处理 def stop_task(self): #还未处理
if self.brun and self.task_status ==1: #不是正常工作状态就不执行停止工作了 if self.brun and self.task_status == 1: #不是正常工作状态就不执行停止工作了
#停止子进程池 #停止子进程池
self.PythonM.shutdown_pool() self.PythonM.shutdown_pool()
#停止线程 #停止线程
self.brun = False self.brun = False
self.update_task_status(0) self.update_task_status(0) #置状态0
# 结束任务需要收尾处理#? # 结束任务需要收尾处理#?
self.InstrM.init_data() #pass self.InstrM.init_data() #pass
def is_over(self):
'''
判断当前任务是否所有节点都已结束
:return: False-非暂停节点有代表事项True-非粘贴节点事项都已完成
'''
b_over = True
nodes = self.attack_tree.traverse_bfs()
for node in nodes:
if node.bwork:
work_status = node.get_work_status()
if work_status != 0:
b_over = False # 有一个有任务就不继续进行判断
break
return b_over
if __name__ == "__main__": if __name__ == "__main__":
pass pass

36
myutils/ReadWriteLock.py

@ -0,0 +1,36 @@
import threading
class ReadWriteLock:
def __init__(self):
self._rw_lock = threading.Lock()
self._readers = 0
self._writers_waiting = 0
self._writer = False
self._cond = threading.Condition(self._rw_lock)
def acquire_read(self):
with self._cond:
# 如果已有写锁,或有写者在等待,都要等
while self._writer or self._writers_waiting > 0:
self._cond.wait()
self._readers += 1
def release_read(self):
with self._cond:
self._readers -= 1
if self._readers == 0:
self._cond.notify_all()
def acquire_write(self):
with self._cond:
self._writers_waiting += 1
# 等到没人读、没人写
while self._writer or self._readers > 0:
self._cond.wait()
self._writers_waiting -= 1
self._writer = True
def release_write(self):
with self._cond:
self._writer = False
self._cond.notify_all()

2
web/API/task.py

@ -141,7 +141,7 @@ async def task_one_step():
if not task_id: if not task_id:
return jsonify({'error': 'Missing task_id'}), 400 return jsonify({'error': 'Missing task_id'}), 400
if not step_num: if not step_num:
step_num = 1 #默认一 step_num = 1 #默认一
bsuccess,error = await g_TaskM.task_one_step(task_id,step_num) bsuccess,error = await g_TaskM.task_one_step(task_id,step_num)
return jsonify({"bsuccess":bsuccess,"error":error}) return jsonify({"bsuccess":bsuccess,"error":error})

2
web/main/templates/index.html

@ -83,7 +83,7 @@
<textarea class="form-control" id="usage" rows="9"> <textarea class="form-control" id="usage" rows="9">
1.测试模式分为两种:自动执行和人工确认(单步模式),模式的切换只允许在暂停情况下调整; 1.测试模式分为两种:自动执行和人工确认(单步模式),模式的切换只允许在暂停情况下调整;
2.暂停不停止正在执行指令,指令执行后会根据当前参数的设定执行下一步工作; 2.暂停不停止正在执行指令,指令执行后会根据当前参数的设定执行下一步工作;
3.单步的作用是将节点中:待执行的指令进行执行,待提交LLM的数据提交LLM; 3.单步的作用是将节点中:待执行的指令进行执行,待提交LLM的数据提交LLM,执行指令和提交LLM都各算一步
4.顶部的单步是针对整个任务的单步执行,若节点执行状态不一致,会存在某些节点执行测试指令,某些节点提交llm任务的情况,节点树区域的控制是针对该节点的控制; 4.顶部的单步是针对整个任务的单步执行,若节点执行状态不一致,会存在某些节点执行测试指令,某些节点提交llm任务的情况,节点树区域的控制是针对该节点的控制;
5.由于LLM的不一致性,会存在无执行任务,但没有标记完成的任务节点,可作为已完成论; 5.由于LLM的不一致性,会存在无执行任务,但没有标记完成的任务节点,可作为已完成论;
6.在单步模式下,若某指令执行的结果错误,可以在查看MSG功能里,修改待提交的执行结果,来保障测试的顺利推进; 6.在单步模式下,若某指令执行的结果错误,可以在查看MSG功能里,修改待提交的执行结果,来保障测试的顺利推进;

2
web/main/templates/task_manager.html

@ -213,7 +213,7 @@
<!-- 按钮 (联动测试状态示例: 执行中->暂停, 暂停中->启动,) --> <!-- 按钮 (联动测试状态示例: 执行中->暂停, 暂停中->启动,) -->
<button class="btn btn-primary btn-block m-2" id="actionButton">启动</button> <button class="btn btn-primary btn-block m-2" id="actionButton">启动</button>
<div class="m-2" style="margin-bottom: 5px"> <div class="m-2" style="margin-bottom: 5px">
<label class="fw-bold" style="font-size:0.9rem">单步次:</label> <label class="fw-bold" style="font-size:0.9rem">单步次:</label>
<select class="form-select" id="modelSelect" style="font-size:0.9rem"> <select class="form-select" id="modelSelect" style="font-size:0.9rem">
<option value="1">1</option> <option value="1">1</option>
<option value="2">2</option> <option value="2">2</option>

Loading…
Cancel
Save