You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

828 lines
42 KiB

'''
渗透测试任务管理类 一次任务的闭合性要检查2025-3-10 一次任务后要清理LLM和InstrM的数据
'''
#from LLMManager import LLMManager # 同理修正其他导入
from mycode.ControlCenter import ControlCenter #控制中心替代LLM--控制中心要实现一定的基础逻辑和渗透测试树的维护。
from mycode.InstructionManager import g_instrM
from mycode.DBManager import DBManager,app_DBM
from mycode.LLMManager import LLMManager
from mycode.AttackMap import AttackTree,TreeNode
from myutils.MyTime import get_local_timestr
from myutils.MyLogger_logger import LogHandler
from myutils.PickleManager import g_PKM
from myutils.ConfigManager import myCongif
from mycode.WebSocketManager import g_WSM
from mycode.CommandVerify import g_CV
from mycode.PythonTManager import PythonTManager
from mycode.DataFilterManager import DataFilterManager
import asyncio
import queue
import time
import os
import re
import threading
import json
import textwrap
class TaskObject:
def __init__(self,test_target,cookie_info,work_type,llm_type,num_threads,local_ip,fake_target,taskM,safe_rank=0):
#功能类相关
self.taskM = taskM
self.logger = LogHandler().get_logger("TaskObject")
self.InstrM = g_instrM # 类对象渗透,要约束只读取信息,且不允许有类全局对象--持续检查
self.PythonM = PythonTManager(myCongif.get_data("Python_max_procs"))
self.DataFilter = DataFilterManager(test_target,fake_target)
self.CCM = ControlCenter() #一个任务一个CCM
self.LLM = LLMManager(llm_type) # LLM对象调整为一个任务一个对象,这样可以为不同的任务选择不同的LLM
#全局变量
self.app_work_type = myCongif.get_data("App_Work_type") #app工作为0时,只允许单步模式工作,是附加规则,不影响正常逻辑处理
self.brun = False #任务的停止可以用该变量来控制
self.sleep_time = myCongif.get_data("sleep_time")
self.target = test_target
self.cookie = cookie_info
self.work_type = work_type #工作模式 0-人工,1-自动
self.task_id = None
self.task_status = 0 #0-暂停,1-执行中,2-已完成 3-未启动,2025-4-27为配合批量添加任务增加“未启动”状态
self.local_ip = local_ip
self.attack_tree = None #任务节点树
self.safe_rank = safe_rank #安全级别 0-9 #?暂时还没实现更新逻辑
self.is_had_work = False
self.is_had_work_lock = threading.Lock()
#指令执行相关-------
self.max_thread_num = num_threads #指令执行线程数量
self.workth_list = [None] * num_threads #线程句柄list
self.doing_instr_list= [""] * num_threads
self.instr_node_queue = queue.Queue() #待执行指令的节点队列
self.node_num = 0 #在处理Node线程的处理
#llm执行相关--------
self.llm_max_nums = myCongif.get_data("LLM_max_threads") # 控制最大并发指令数量 --- 多线程的话节点树需要加锁
self.llmth_list = [None] * self.llm_max_nums # llm线程list
self.doing_llm_list = [""] * self.llm_max_nums
self.llm_node_queue = queue.Queue() #待提交LLM的节点队列
#自检线程--------
self.check_th = None #自检线程句柄
#-----四队列-----
self.run_instr_lock = threading.Lock() # 线程锁
self.runing_instr = {} #执行中指令记录
#---------------三个线程------------
#测试指令执行线程
def mill_instr_preprocess(self,instructions,str_split):
new_instr = []
instrs = instructions.split(str_split)
index = 0
for instr in instrs:
if instr.strip().startswith("curl"):
if " --max-time " not in instr:
out_time = g_instrM.get_tool_out_time("curl")
instr = instr.strip() + f" --max-time {str(out_time)}"
#instr = instr.strip() + " --max-time 10"
new_instr.append(instr)
index += 1
new_star_instr = f"{str_split}".join(new_instr)
print(new_star_instr)
return new_star_instr
def smart_truncate(self,s: str,max_length: int = 5000,ellipsis: str = "...") -> str:
"""截取字符串前 max_length 个字符,并添加截断标记(确保总长度不超过限制)"""
if not s:
return s
#一些无效信息的删除
s = s.replace("pydev debugger: bytes arguments were passed to a new process creation function. Breakpoints may not work correctly.","")
# 计算保留长度(考虑截断标记占位)
truncated = s[:max_length - len(ellipsis)] if len(s) > max_length else s
return truncated + ellipsis if len(s) > max_length else s
def do_instruction(self,instruction):
instruction = textwrap.dedent(instruction.strip())
# 对多shell指令的情况进行处理--也有风险
if "python-code" not in instruction:
if "&&" in instruction:
instruction = self.mill_instr_preprocess(instruction, "&&")
elif "||" in instruction:
instruction = self.mill_instr_preprocess(instruction, "||")
start_time = get_local_timestr() # 指令执行开始时间
# 2025-4-27要重新定义bool值作用,初步想法True-作为指令已处理(执行或不执行),False-作为异常,指令还没有处理-可以回Q
if instruction.startswith("python-code"): # python代码--超过子进程数会阻塞等待,但不开始超时记时
bsuccess, instr, reslut, source_result, ext_params = self.PythonM.execute_instruction(instruction)
else: # shell指令
bsuccess, instr, reslut, source_result, ext_params = self.InstrM.execute_instruction(instruction)
end_time = get_local_timestr() # 指令执行结束时间
# 只取结果的5000长度
reslut = self.smart_truncate(reslut, 4000)
source_result = self.smart_truncate(source_result)
return start_time,end_time,bsuccess,instr,reslut,source_result,ext_params
def do_worker_th(self,index):
#线程的dbm需要一个线程一个
th_DBM = DBManager()
th_DBM.connect()
th_index = index
bnode_work = False
while self.brun:
if self.task_status == 1:
try:
llm_type = 1
work_node = self.instr_node_queue.get(block=False)#正常一个队列中一个节点只会出现一次,进而也就只有一个线程在处理
# 开始执行指令
bnode_work = True
results = []
while True:
instruction = work_node.get_instr()
if not instruction:
break
self.doing_instr_list[th_index] = instruction
start_time, end_time, bsuccess, instr, reslut, source_result, ext_params = self.do_instruction(instruction)
# 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#?
if th_DBM.ok:
work_node.do_sn += 1
th_DBM.insetr_result(self.task_id, instr, reslut, work_node.do_sn, start_time, end_time,
source_result,
ext_params, work_node.path)
else:
self.logger.error("数据库连接失败!!")
# 暂存结果
fake_inst,fake_resul = self.DataFilter.filter_result(instr,reslut)
oneres = {'执行指令': fake_inst, '结果': fake_resul}
results.append(oneres) #结果入队列后,指令就不能回退
#一条指令执行完成
self.doing_instr_list[th_index] = ""
#指令都执行结束后,入节点待提交队列
str_res = json.dumps(results, ensure_ascii=False)
# 提交llm待处理任务 --更新节点work_status
if work_node.do_sn >= myCongif.get_data("max_do_sn"): # 该节点执行指令已超过10条
llm_type = 10
self.put_node_reslist(work_node, str_res, llm_type)
# 保存记录
g_PKM.WriteData(self.attack_tree,str(self.task_id))
except queue.Empty:
if bnode_work:
bnode_work = False
self.no_work_to_do() #判断是否需要把当前任务的无工作状态推送到前端
time.sleep(self.sleep_time)
else:#不是工作状态则休眠
time.sleep(self.sleep_time)
#llm请求提交线程
def th_llm_worker(self,index):
'''
几个规则--TM的work线程同
1.线程获取一个节点后,其他线程不能再获取这个节点(遇到被执行的节点,直接放弃执行)--- 加了没办法保存中间结果进行测试
2.llm返回的指令,只可能是该节点,或者是子节点的,不符合这条规则的都不处理,避免llm处理混乱。
:return:
'''
# 线程的dbm需要一个线程一个
th_DBM = DBManager()
th_DBM.connect()
th_index = index
bnode_work = False
while self.brun:
if self.task_status == 1:
try:
llm_node = self.llm_node_queue.get(block=False) #获取一个待处理节点
#开始处理
bnode_work = True
tmp_commands = []
# {llm_node.status} --- 暂时固化为未完成
user_Prompt = f'''
当前分支路径:{llm_node.path}
当前节点信息:
- 节点名称:{llm_node.name}
- 节点状态:未完成
- 漏洞类型:{llm_node.vul_type}
'''
while True:
llm_data = llm_node.get_res()
if llm_data is None:
break
llm_type = llm_data["llm_type"]
str_res = llm_data["result"]
#获取提示词
prompt = self.get_llm_prompt(llm_type,str_res,user_Prompt)
fake_prompt = self.DataFilter.filter_prompt(prompt) #目标脱敏
self.doing_llm_list[th_index] = prompt
# 提交llm请求返回数据--并对返回数据进行处理,节点指令直接执行,测试指令根据工作模式执行
post_time = get_local_timestr()
bsuccess,node_cmds, commands,reasoning_content, content = self.LLM.get_llm_instruction(fake_prompt,llm_node,self.DataFilter) # message要更新 --llm_node只使用messages,都是脱敏后的数据
if not bsuccess:
self.logger.error(f"模型接口调用出错:{content}")
continue #丢弃 --若需要再次尝试,把llm_data再入队列
# LLM记录存数据库
if th_DBM.ok:
llm_node.llm_sn += 1
bres = th_DBM.insert_llm(self.task_id, prompt, reasoning_content, content, post_time, llm_node.llm_sn,llm_node.path)
if not bres:
self.logger.error(f"{llm_node.name}-llm入库失败!")
else:
self.logger.error("数据库连接失败!")
'''
对于LLM返回的错误处理机制
1.验证节点是否都有测试指令返回
2.LLM的回复开始反复时(有点难判断)
'''
# 更新tree
bok, new_commands,iadd_node = self.tree_manager(node_cmds, llm_node, commands, th_DBM)
# 分析指令入对应节点
if bok: # 节点指令若存在错误,测试指令都不处理,需要LLM重新生成
#tmp_commands.extend(new_commands)
# 测试指令入节点待处理队列 --同时修改节点的work_status
self.put_node_instrlist(new_commands, llm_node, iadd_node)
self.doing_llm_list[th_index] = ""
#一个节点完成,节点树持久化---待验证是否有局部更新持久化的方案
g_PKM.WriteData(self.attack_tree,str(self.task_id))
#推送前端刷新数据
if self.taskM.web_cur_task == self.task_id: # 如果新增了节点,且该节点树是当前查看的数据,需要通知前端更新数据
idatatype = 2
strdata = "update accack_tree!"
asyncio.run(g_WSM.send_data(idatatype, strdata))
# 先取消当前task,已经通知前端重新获取,这样可以避免后端不必要的数据推送
#self.taskM.web_cur_task = 0
except queue.Empty:
if bnode_work:
bnode_work = False
self.no_work_to_do() # 判断是否需要把当前任务的无工作状态推送到前端
time.sleep(self.sleep_time)
else:
time.sleep(self.sleep_time)
def had_work_to_do(self): #任务单步状态控制 -- 置工作中 --约束:只有任务单步会调用
bsuccess = False
with self.is_had_work_lock:
if not self.is_had_work:
bsuccess = True # 这不return 是怕不释放锁
self.is_had_work = True
return bsuccess
def no_work_to_do(self): #任务单步状态控制-- 非工作中
# 待执行instr-node
instr_node_list = list(self.instr_node_queue.queue) # 待执行指令的node--线程不安全
llm_node_list = list(self.llm_node_queue.queue) # 待提交llm的node--线程不安全
if len(instr_node_list) == 0 and len(llm_node_list) == 0: #没有待办节点了
for str_instr in self.doing_instr_list:
if str_instr != "":
return
for str_llm in self.doing_llm_list:
if str_llm != "":
return
#没有在执行任务了
#推送是否有工作任务的状态到前端,
with self.is_had_work_lock:
if self.is_had_work: #如果已经是False那就不需要修改了
#推送到前端
if self.task_id == self.taskM.web_cur_task:
#把是否有任务在执行的状态推送到前端
idatatype = 3
strdata = "单步任务执行完成!"
asyncio.run(g_WSM.send_data(idatatype, strdata))
self.is_had_work = False
#自检线程 --1.输出执行状态。2.需要自检和修复
def th_check(self):
icount = 0
while self.brun:
try:
cur_time = get_local_timestr()
print(f"----------{self.task_id}-当前时间程序运行情况:{cur_time}")
#执行中instr-node
index = 0
for w_th in self.workth_list:
if not w_th.is_alive():#线程
print(f"Work线程-{index}已处于异常状态,需要重新创建一个工作线程")
else:
if self.doing_instr_list[index]:
print(f"Work线程-{index}-在执行指令:{self.doing_instr_list[index]}")
index += 1
index = 0
for l_th in self.llmth_list:
if not l_th.is_alive():
print(f"LLM线程-{index}已处于异常状态,需要重新创建一个LLM线程")
else:
if self.doing_llm_list[index]:
print(f"LLM线程-{index}-在执行指令:{self.doing_llm_list[index]}")
index += 1
#处理点修复操作
icount +=1
if icount == 5:
pass
#休眠60
time.sleep(60)
except Exception as e:
print(f"*********自检线程异常退出:{str(e)}")
break
#------------入两个nodeMQ-禁止直接调用入队列-----------
def put_instr_mq(self,node):
#这里不做状态的判断,调用前处理
self.instr_node_queue.put(node)
self.update_node_work_status(node,2) #在执行--1.work_status不影响整个任务的执行,错了问题不大,2--attack_tree持久化需要出去lock信息。
def put_llm_mq(self,node):
#同instr_mq
self.llm_node_queue.put(node)
self.update_node_work_status(node,4) #提交中
async def put_instr_mq_async(self,node):
#这里不做状态的判断,调用前处理
self.instr_node_queue.put(node)
await self.update_node_work_status_async(node,2) #在执行--1.work_status不影响整个任务的执行,错了问题不大,2--attack_tree持久化需要出去lock信息。
async def put_llm_mq_async(self,node):
#同instr_mq
self.llm_node_queue.put(node)
await self.update_node_work_status_async(node,4) #提交中
async def update_node_work_status_async(self,node,work_status):
#更新状态
bchange = node.update_work_status(work_status)
#基于websocket推送到前端
if bchange:
#判断是否是web端最新获取数据的task
if self.taskM.web_cur_task == self.task_id:
idatatype = 1
strdata = {"node_path":node.path,"node_workstatus":work_status}
await g_WSM.send_data(idatatype,strdata)
#------------入Node的两个list--禁止直接调用入list-------
def put_node_reslist(self, node, str_res, llm_type):
# 推送llm提交任务到节点的待处理任务中,并根据工作模式判断是否如llm_node_quere
one_llm = {'llm_type': llm_type, 'result': str_res}
node.add_res(one_llm) # 入节点结果队列
self.update_node_work_status(node,3) #待提交llm
# 如果是自动执行的模式则入队列交给llm线程处理
if self.app_work_type == 1:
if self.work_type == 1 and node.bwork:
self.put_llm_mq(node) #变4
#递归找节点
def find_node_by_child_node_name(self,cur_node,node_name):
find_node = None
if cur_node.children:
for child_node in cur_node.children:
if child_node.name == node_name:
find_node = child_node
break
else:
find_node = self.find_node_by_child_node_name(child_node,node_name)
if find_node:
break
return find_node
def replace_error_instr(self,command):
command = command.replace("| |","||")
command = command.replace("& &","&&")
return command
def put_node_instrlist(self, commands, node,iadd_node): #如果当前节点没有进一般指令返回,需要修改节点执行状态
if not node:
return
node_list = [] #有待办指令的节点
for command in commands:
command = self.replace_error_instr(command)
# 使用正则匹配方括号中的node_path(非贪婪模式)
match = re.search(r'\[(.*?)\]', command)
if match:
node_path = match.group(1)
node_name = node_path.split("->")[-1]
instruction = re.sub(r'\[.*?\]', "", command, count=1, flags=re.DOTALL)
#'''强制约束,不是本节点或者是子节点的指令不处理'''
find_node = None
if node_name == node.name:
find_node = node
else:
for child_node in node.children: #暂时只找一层
if node_name == child_node.name:
find_node = child_node
break
# find_node = self.attack_tree.find_node_by_nodepath_parent(node_path,node,iadd_node,commands)
# if not find_node:#对于没有基于节点路径找到对应节点--增加通过节点名称匹配的机制 2025-4-13日添加
# find_node = self.find_node_by_child_node_name(node, node_name) # 递归找子节点
if find_node:
find_node.add_instr(instruction,node.parent_messages,node.cur_messages) #2025-4-23调整为第一添加指令时传递Msg
#DS-llm存在返回指令还会修改节点状态为已完成的问题,需要修正
find_node.status = "未完成"
if find_node not in node_list:
node_list.append(find_node)
self.update_node_work_status(find_node,1) #待执行指令
else:#如果还没找到就暂时放弃
self.logger.error(f"没有找到指令对应的节点:{node_path},当前节点{node.path}")#丢弃该指令
else:
self.logger.error(f"得到的指令格式不符合规范:{command}")#丢弃该指令---
#这里对于丢弃指令,有几种方案:
# 1.直接丢弃不处理,但需要考虑会不会产生节点缺失指令的问题,需要有机制验证节点;------ 需要有个独立线程,节点要加锁--首选待改进方案
# 2.入当前节点的res_queue,但有可能当前节点没有其他指令,不会触发提交,另外就算提交,会不会产生预设范围外的返回,不确定;
# 3.独立队列处理
#判断当前节点是否有指令
if node not in node_list:
#修改该节点状态为0--无待执行任务
self.update_node_work_status(node,0)
#入instr队列
if self.app_work_type == 1:
if self.work_type == 1: #是自动执行模式
for node in node_list:
if node.bwork:
self.put_instr_mq(node) #2-执行中
def put_work_node(self):
'''遍历节点需要处理的任务,提交mq'''
nodes = self.attack_tree.traverse_bfs()
for node in nodes:
if not node.is_instr_empty(): #待执行指令有值
if not node.is_llm_empty():
self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!")
else:
if node.bwork:
self.put_instr_mq(node) #提交执行
elif not node.is_llm_empty(): #待提交llm有值
if not node.is_instr_empty():
self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!")
else:
if node.bwork:
self.put_llm_mq(node) #提交执行
#web端提交单步任务--节点单步
async def put_one_node(self,node):
#提交某个节点的代表任务
if self.task_status ==1 and self.work_type==0 and node.bwork:
# node_status = node.get_work_status()
# if node_status == 2 or node_status == 4:
# return False,"当前节点正在执行任务,请稍后点击单步!"
if not node.is_instr_empty(): #待执行指令有值
if not node.is_llm_empty():
self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!")
return False,"该节点的待执行任务数据不正确,请联系管理员!"
else:
if node.bwork:
await self.put_instr_mq_async(node) #提交执行
elif not node.is_llm_empty(): #待提交llm有值
if not node.is_instr_empty():
self.logger.error(f"{node.path}即存在待执行指令,还存在待提交的llm,需要人工介入!!")
return False, "该节点的待执行任务数据不正确,请联系管理员!"
else:
if node.bwork:
await self.put_llm_mq_async(node) #提交执行
else:
await self.update_node_work_status_async(node,0) #只是修补措施,保障状态的一致性
return False,"当前节点没有待执行任务!"
return True,"已提交单步任务"
else:
return False,"当前的任务或节点状态不允许执行单步,请检查!"
#web端提交任务单步--任务单步
async def put_one_task(self):
if self.task_status == 1 and self.work_type == 0:
bsuccess = self.had_work_to_do()
if bsuccess:
nodes = self.attack_tree.traverse_bfs()
for node in nodes:
_,_ = await self.put_one_node(node)
return True,"已提交单步任务"
else:
return False,"当前任务正在执行任务中,不需要提交单步任务!"
else:
return False,"当前的任务状态不允许执行单步,请检查!"
#修改节点的执行状态,并需要基于websocket推送到前端显示 同步线程调用
def update_node_work_status(self,node,work_status):
#更新状态
bchange = node.update_work_status(work_status)
#基于websocket推送到前端
if bchange and work_status != 1: #llm执行完成后会发送单独的指令更新树,所以不发送1更新节点了
#判断是否是web端最新获取数据的task
if self.taskM.web_cur_task == self.task_id:
idatatype = 1
strdata = {"node_path":node.path,"node_workstatus":work_status}
asyncio.run(g_WSM.send_data(idatatype,strdata))
#获取本次的提交提示词
def get_llm_prompt(self,llm_type,str_res,user_Prompt):
if llm_type == 0:
ext_Prompt = f'''
初始信息:{str_res}
任务:请开始对该目标的渗透测试工作。
'''
elif llm_type == 1: # 提交指令执行结果 --- 正常提交
# 构造本次提交的prompt
ext_Prompt = f'''
上一步结果:{str_res}
任务:请生成下一步指令。
'''
elif llm_type == 2: # llm返回的指令存在问题,需要再次请求返回
ext_Prompt = f'''
反馈类型:节点指令格式错误
错误信息:{str_res}
任务:请按格式要求重新生成该节点上一次返回中生成的所有指令。
'''
elif llm_type ==3: #对节点没有指令的,请求指令
ext_Prompt = f'''
任务:{str_res}
'''
elif llm_type == 5:
ext_Prompt = f'''
反馈类型:测试指令格式错误
错误信息:{str_res}
任务:请根据格式要求,重新生成该测试指令。
'''
elif llm_type == 10:
max_do_sn = myCongif.get_data("max_do_sn")
ext_Prompt = f'''
上一步结果:{str_res}
任务:当前节点已执行超过{max_do_sn}次测试指令,若无新发现请结束该节点的测试,若有新发现请生成子节点,在子节点推进下一步测试。
'''
else:
self.logger.debug("意外的类型参数")
return ""
user_Prompt = user_Prompt + ext_Prompt
return user_Prompt
#添加子节点
def add_children_node(self,parent_node,children_names,cur_message=None,status="未完成"):
existing_names = {node.name for node in parent_node.children} # 现有子节点名称集合
unique_names = list(set(children_names)) # 去重
for child_name in unique_names:
if child_name not in existing_names:
# 添加节点
new_node = TreeNode(child_name, parent_node.task_id, status)
parent_node.add_child(new_node)
#existing_names.add(child_name) # 更新集合 -- 已经去重过了,不需要在添加到比对
#处理节点指令
def tree_manager(self,node_cmds,node,commands,DBM):
'''更新渗透测试树
node_cmds是json-list
2025-03-22添加commands参数,用于处理LLM对同一个节点返回了测试指令,但还返回了no_instruction节点指令
'''
if not node_cmds: # or len(node_cmds)==0: 正常not判断就可以有没有节点指令
return True,commands,0
#对节点指令进行校验
bok,strerror = g_CV.verify_node_cmds(node_cmds)
if not bok: #节点指令存在问题,则不进行后续处理,提交一个错误反馈任务
# 提交llm待处理任务
self.put_node_reslist(node, strerror, 2)
return False,commands,0
#message_调整传递时机后,可以先执行添加节点
# #对节点数据进行初步验证
# ad_instr_nodes, no_add_nodes = g_CV.verify_node_data(node_cmds)
# if no_add_nodes:#如果有没有添加的节点,默认在当前节点下添加 -- 一般不会有,还没遇到
# self.add_children_node(node,no_add_nodes,node)
# #ad_instr_nodes --- 还没处理
residue_cmd_no_add = []
add_node_names = []
for node_json in node_cmds:
action = node_json["action"]
if action == "add_node": # 新增节点
parent_node_name = node_json["parent"]
# status = "未完成" #2025-4-11修改MGS-节点指令格式,取消了status
add_node_names = node_json["nodes"].replace('', ',').split(',')
# 新增节点原则上应该都是当前节点增加子节点
if node.name == parent_node_name or parent_node_name.endswith(node.name): # 2233ai,节点名称字段会返回整个路径
# 添加当前节点的子节点 -- 这是标准情况
self.add_children_node(node, add_node_names)
elif node.parent.name == parent_node_name or parent_node_name.endswith(node.parent.name): # 添加当前节点的平级节点
# 是添加当前节点的平级节点(当前节点的父节点下添加子节点) --使用2233ai-o3时遇到的情况
self.add_children_node(node.parent, add_node_names)
else:
badd = False
for child_node in node.children: # 给子节点添加子节点
if parent_node_name == child_node.name or parent_node_name.endswith(child_node.name):
badd = True
self.add_children_node(child_node, add_node_names)
break
if not badd:
self.logger.error(f"添加子节点失败!父节点不是当前节点,不是当前节点的父节点,不是当前节点的子节点,需要介入!!{node_json}---当前节点为:{node.path}") # 丢弃该节点
else: # 未处理的节点指令添加到list
residue_cmd_no_add.append(node_json)
#处理on_instruction
residue_node_cmds = []
no_instr_nodes = []
for node_cmd in residue_cmd_no_add:
action = node_cmd["action"]
if action == "no_instruction":
node_names = node_cmd["nodes"].replace('',',').split(',')
for node_name in node_names:
# 先判断是否在测试指令中,若在则不提交llm任务,只能接受在一次返回中同一节点有多条测试指令,不允许分次返回
bcommand = False
for com in commands:
if node_name in com:
bcommand = True
break
if bcommand: # 如果存在测试指令,则不把该节点放入补充信息llm任务---尝试不对比是否有返回指令,DS会一直返回指令,还返回on_instruction
continue
#判断是否有对应节点---原则上只允许同批次add的节点没有添加指令的情况
if node_name in add_node_names:
no_instr_nodes.append(node_name)
else:
self.logger.error("遇到一次不在add_node中,但在no_instr_nodes中的数据")
#粗暴的做法,添加在当前节点下
self.add_children_node(node, [node_name])
no_instr_nodes.append(node_name)
else:#剩余的节点指令
residue_node_cmds.append(node_cmd)
if no_instr_nodes: # 阻塞式,在当前节点提交补充信息,完善节点指令 -- 优势是省token
new_commands = self.get_other_instruction(no_instr_nodes, DBM, node)
commands.extend(new_commands)
#执行剩余的节点指令--不分先后
for node_json in residue_node_cmds:#2025-4-11重新调整了节点指令格式定义
action = node_json["action"]
if action == "find_vul":
node_name = node_json["node"]
vul_node = None
if node.name == node_name or node_name.endswith(node.name): #正常应该是当前节点漏洞信息--暂时只考虑只会有一个漏洞
vul_node = node
else: #匹配子节点
for child in node.children:
if child.name == node_name or node_name.endswith(child.name):
vul_node = node
break
if vul_node: #找到对应了漏洞节点
try:
vul_node.vul_type = node_json["vulnerability"]["name"]
vul_node.vul_grade = node_json["vulnerability"]["risk"]
vul_node.vul_info = node_json["vulnerability"]["info"]
#保存到数据库
DBM.insert_taks_vul(self.task_id,vul_node.name,vul_node.path,vul_node.vul_type,vul_node.vul_grade,
vul_node.vul_info)
except:
self.logger.error("漏洞信息错误")
continue
else:
str_user = f"遇到不是本节点和子节点漏洞的,需要介入!!{node_json}--当前节点{node.path}"
self.logger.error(str_user)
elif action == "end_work":
node_name = node_json["node"]
if node.name == node_name or node_name.endswith(node_name): # 正常应该是当前节点
node.status = "已完成"
else:
str_user = f"遇到不是修改本节点状态的,需要介入!!{node_json}--当前节点{node.path}"
self.logger.error(str_user)
else:
self.logger.error("****不应该执行到这!程序逻辑存在问题!")
return True,commands,len(add_node_names)
#阻塞轮询补充指令
def get_other_instruction(self,nodes,DBM,cur_node):
res_str = ','.join(nodes)
new_commands = []
ierror = 0
while res_str:
self.logger.debug(f"开始针对f{res_str}这些节点请求测试指令")
user_Prompt = f'''
当前分支路径:{cur_node.path}
当前节点信息:
- 节点名称:{cur_node.name}
- 节点状态:{cur_node.status}
- 漏洞类型:{cur_node.vul_type}
反馈类型:需要补充以下子节点的测试指令:{res_str}
任务:
1.请生成这些子节点的测试指令,注意不要生成重复的测试指令;
2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径;
3.只有当还有节点未能生成测试指令或不完整时,才返回未生成指令的节点列表。
'''
fake_prompt = self.DataFilter.filter_prompt(user_Prompt)
bsuccess,node_cmds, commands, reasoning_content, content = self.LLM.get_llm_instruction(fake_prompt,
cur_node,self.DataFilter) # message要更新
if not bsuccess:
self.logger.error(f"模型接口调用出错:{content}")
ierror += 1
if ierror == 3: #重试3词
break
continue# res_str没有调整,重复使用
res_str = ""
# LLM记录存数据库
cur_node.llm_sn += 1
post_time = get_local_timestr()
bres = DBM.insert_llm(self.task_id, user_Prompt, reasoning_content, content, post_time, cur_node.llm_sn,cur_node.path)
if not bres:
self.logger.error(f"{cur_node.name}-llm入库失败!")
#把返回的测试指令进行追加
new_commands.extend(commands)
#判断是否还有未添加指令的节点
for node_json in node_cmds: #正常应该只有一条no_instruction --暂时只处理
if "no_instruction" in node_json and "nodes" in node_json:
tmp_nodes = []
node_names = node_json["nodes"].replace('',',').split(',')
for node_name in node_names:
if node_name in nodes:
tmp_nodes.append(node_name)
res_str = ','.join(tmp_nodes)
break
else:#其他节点指令不处理
self.logger.error(f"遇到一次no_instruction补充指令时返回了其他节点指令{node_cmds}")
self.logger.debug("未添加指令的节点,都已完成指令的添加!")
return new_commands
#-----------------任务的启停--------------------
def init_task(self,task_id,attack_tree = None):
self.task_id = task_id
# 初始化节点树
if attack_tree: # 有值的情况是load
self.attack_tree = attack_tree
# 加载未完成的任务
if self.work_type == 1: # 自动模式
# 提交到mq,待线程执行
self.put_work_node()
else: # 无值的情况是new_create
root_node = TreeNode(self.target, self.task_id) # 根节点
self.attack_tree = AttackTree(root_node) # 创建测试树,同时更新根节点相关内容
self.LLM.build_initial_prompt(root_node) # 对根节点初始化system-msg
# 插入一个user消息
# 提交第一个llm任务,开始工作
know_info = f"本测试主机的IP地址为:{self.local_ip}"
self.put_node_reslist(root_node, know_info, 0) # 入待提交list
# 初始保存个attack_tree文件
g_PKM.WriteData(self.attack_tree, str(self.task_id))
def is_task_stop(self):
#检查任务是否处于停止状态--防止停止后,线程还没停,又启动工作线程,造成混乱
#工作线程
for work_th in self.workth_list:
if work_th:
if work_th.is_alive():
self.logger.debug(f"{self.task_id}有存活工作线程")
return False
#llm线程
for llm_th in self.llmth_list:
if llm_th:
if llm_th.is_alive():
self.logger.debug(f"{self.task_id}有存活LLM线程")
return False
#自检线程
if self.check_th:
if self.check_th.is_alive():
self.logger.debug(f"{self.task_id}有存活自检线程")
return False
#工作子进程池
if self.PythonM.is_pool_active():
self.logger.debug(f"{self.task_id}有存活子进程池")
return False
return True
def update_task_work_type(self,new_work_type):
self.work_type = new_work_type
#更新数据库
app_DBM.update_task_work_type(self.task_id,new_work_type)
def update_task_status(self,new_status):
self.task_status = new_status
#更新数据库
app_DBM.update_task_status(self.task_id,new_status)
def start_task(self):
'''
启动该测试任务,
:return bool str
'''
if self.is_task_stop():
#更新状态标识
self.update_task_status(1)
self.brun = True #线程正常启动
#启动指令工作线程
for i in range(self.max_thread_num):
w_th = threading.Thread(target=self.do_worker_th,args=(i,),name=f"{self.task_id}-w_th-{i}")
w_th.start()
self.workth_list[i] = w_th
#启动llm提交线程--llm暂时单线程,多线程处理时attack_tree需要加锁
for j in range(self.llm_max_nums):
l_th = threading.Thread(target=self.th_llm_worker,args=(j,),name=f"{self.task_id}-l_th-{i}")
l_th.start()
self.llmth_list[j]=l_th
#启动自检线程
self.check_th = threading.Thread(target=self.th_check)
self.check_th.start()
#启动python子进程池 --进程池的启动,目前是没全面确认原子进程池的状态,直接None
self.PythonM.start_pool()
return True,"启动任务成功!" #就算启动了一部分线程,也要认为是成功
else:
return False,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!"
def stop_task(self): #还未处理
if self.brun and self.task_status ==1: #不是正常工作状态就不执行停止工作了
#停止子进程池
self.PythonM.shutdown_pool()
#停止线程
self.brun = False
self.update_task_status(0)
# 结束任务需要收尾处理#?
self.InstrM.init_data() #pass
if __name__ == "__main__":
pass