From 2a2970634fa9a5b7f05ef85d2d945b4266dcd1bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E9=BE=99?= Date: Thu, 29 May 2025 09:15:13 +0800 Subject: [PATCH] V0.5.5.5 1.before do per instruct bak --- .idea/misc.xml | 2 +- .idea/zf_safe.iml | 2 +- config.yaml | 2 +- mycode/DBManager.py | 10 +-- mycode/LLMManager.py | 2 +- mycode/PollingManager.py | 2 +- mycode/PythonTManager.py | 1 - mycode/PythoncodeTool.py | 17 ++--- mycode/TaskManager.py | 144 +++++++++++++++++++++++++++++---------- mycode/TaskObject.py | 52 ++++++++------ run.py | 2 +- 11 files changed, 153 insertions(+), 83 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index f22b61c..b7f37e1 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,4 @@ - + \ No newline at end of file diff --git a/.idea/zf_safe.iml b/.idea/zf_safe.iml index b279356..d1e770f 100644 --- a/.idea/zf_safe.iml +++ b/.idea/zf_safe.iml @@ -2,7 +2,7 @@ - + \ No newline at end of file diff --git a/config.yaml b/config.yaml index bc7f249..ef96474 100644 --- a/config.yaml +++ b/config.yaml @@ -2,6 +2,7 @@ App_Work_type: 0 #0-开发模式,只允许单步模式 1-生产模式 包裹下的逻辑可以正常执行 max_run_task: 3 #允许同时运行的任务数 max_polling_task: 3 #运行同时运行的巡检任务 +Python_max_procs: 10 #进程池最大上限10个 #线程休眠的时间 sleep_time: 20 @@ -39,5 +40,4 @@ TreeFile: tree_data/attack_tree #task Task_max_threads: 5 -Python_max_procs: 3 LLM_max_threads: 1 diff --git a/mycode/DBManager.py b/mycode/DBManager.py index 44f660c..dde3a56 100644 --- a/mycode/DBManager.py +++ b/mycode/DBManager.py @@ -219,7 +219,7 @@ SELECT COLUMN_NAME, CHARACTER_MAXIMUM_LENGTH return data def get_run_tasks(self): - strsql = "select ID,task_target,task_status,work_type,cookie_info,llm_type,safe_rank,fake_target,target_id from task where task_status <> 2 order by ID;" + strsql = "select ID,task_target,task_status,work_type,cookie_info,llm_type,safe_rank,fake_target,target_id from task where task_status <> 4 order by ID;" datas = self.do_select(strsql) return datas @@ -240,7 +240,7 @@ SELECT COLUMN_NAME, CHARACTER_MAXIMUM_LENGTH def over_task(self,task_id): over_time = get_local_timestr() - strsql = "update task set task_status=2,end_time=%s where ID=%s;" + strsql = "update task set task_status=4,end_time=%s where ID=%s;" params = (over_time,task_id) bok,_ = self.safe_do_sql(strsql, params) return bok @@ -409,11 +409,11 @@ SELECT COLUMN_NAME, CHARACTER_MAXIMUM_LENGTH def get_his_tasks(self,target_name,safe_rank,llm_type,start_time,end_time): strsql = "select ID,task_target,safe_rank,llm_type,start_time,end_time from task" conditions = ["task_status=%s"] - params = [2] + params = [4] # 按需添加其他条件 if target_name and target_name.strip(): # 检查nodename是否非空(去除前后空格后) - conditions.append("task_target=%s") - params.append(target_name) + conditions.append("task_target like %s") + params.append(f"%{target_name}%") if safe_rank and safe_rank.strip(): # 检查vultype是否非空 conditions.append("safe_rank=%s") diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py index 0f15368..431afeb 100644 --- a/mycode/LLMManager.py +++ b/mycode/LLMManager.py @@ -123,7 +123,7 @@ mysql -u root -p 192.168.1.100 **测试指令格式** - dash指令:{\"action\":\"dash\",\"path\":\"节点路径\",\"content\":\"指令内容\"} - python指令:{\"action\":\"python\",\"path\":\"节点路径\",\"content\":\"指令内容\"} - * python主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(status, output) + * python主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(bool, output) - [节点路径]为从根节点到目标节点的完整层级路径,以"->"关联,如:目标系统->192.168.1.100 **JSON结果格式** - JSON结果:{\"action\":\"asset\",\"URL\":{URL信息json},\"IPS\":[\"IP\":\"192.168.1.100\",\"IPtype\":\"IPv4/IPv6\",\"Ports\":[端口信息json]]} diff --git a/mycode/PollingManager.py b/mycode/PollingManager.py index 221d34c..b2338c3 100644 --- a/mycode/PollingManager.py +++ b/mycode/PollingManager.py @@ -102,7 +102,7 @@ class PollingManager: def do_check(self,target): print(f"巡检中: {target[2]} (类型: {target[9]})") - #创建task---并初始化待办 + #创建task---并初始化待办 ----尝试执行 g_TaskM.create_polling_task(target) diff --git a/mycode/PythonTManager.py b/mycode/PythonTManager.py index 68a0620..33cfcf7 100644 --- a/mycode/PythonTManager.py +++ b/mycode/PythonTManager.py @@ -28,7 +28,6 @@ class PythonTManager: def is_pool_active(self): return self.python_tool.pool_active - def execute_instruction(self,instruction): bwork = False while self.brun: diff --git a/mycode/PythoncodeTool.py b/mycode/PythoncodeTool.py index cdd2c63..e253180 100644 --- a/mycode/PythoncodeTool.py +++ b/mycode/PythoncodeTool.py @@ -163,21 +163,12 @@ class PythoncodeTool(): if self.proc_pool is not None and self.pool_active: try: print("关闭进程池...") - pool = self.proc_pool + #2025-5-28进程池上移后,可以直接阻塞等待完成了。 + # 这里是真正阻塞等待队列管理和子进程退出 + self.proc_pool.shutdown(wait=True) + print("进程池已完全关闭。") self.pool_active = False self.proc_pool = None - - def _shutdown_background(): - try: - # 这里是真正阻塞等待队列管理和子进程退出 - pool.shutdown(wait=True) - print("进程池已完全关闭。") - except Exception as e: - print(f"后台关闭进程池时出错: {e}") - - # 启动一个守护线程来做真正的 shutdown(wait=True) - t = threading.Thread(target=_shutdown_background, daemon=True) - t.start() except Exception as e: print(f"子进程关闭异常。。{e}") else: diff --git a/mycode/TaskManager.py b/mycode/TaskManager.py index 007043d..5e34dee 100644 --- a/mycode/TaskManager.py +++ b/mycode/TaskManager.py @@ -4,6 +4,7 @@ from mycode.DBManager import app_DBM from myutils.PickleManager import g_PKM from myutils.MyLogger_logger import LogHandler from mycode.TargetManager import TargetManager # 从模块导入类 +from mycode.PythonTManager import PythonTManager import time import threading @@ -16,20 +17,30 @@ class TaskManager: data = app_DBM.get_system_info() self.local_ip = data[0] self.version = data[1] - self.tasks_lock = threading.Lock() + self.brun = True self.tm_th = None #任务控制线程 #----临时任务相关------- self.web_cur_task = 0 #web端当前显示的 -- web端可以操作的都为临时任务 + self.tasks_lock = threading.Lock() self.tasks = {} # 执行中的任务,test_id为key self.max_run_task = myCongif.get_data("max_run_task") self.cur_task_run_num = 0 #-----巡检任务相关------- self.polling_llmtype = myCongif.get_data("polling_llm_type") + self.polling_tasks_lock = threading.Lock() self.polling_tasks = {} #独立一个队列 self.max_polling_task = myCongif.get_data("max_polling_task") - self.cur_polling_task = 0 + self.cur_polling_task_num = 0 self.work_type = 1 #巡检工作模式就是自动了 + #python 进程池 提到程序主线程层面 --不随任务进行启停 + self.PythonM = PythonTManager(myCongif.get_data("Python_max_procs")) + # 启动python子进程池 --进程池的启动,目前是没全面确认原子进程池的状态,直接None + self.PythonM.start_pool() #开启进程池 + + def __del__(self): + # 停止子进程池 + self.PythonM.shutdown_pool() #判断目标是不是在当前执行任务中,---没加锁,最多跟删除有冲突,问题应该不大 def is_target_in_tasks(self,task_target,itype = 1): @@ -46,7 +57,7 @@ class TaskManager: #程序启动后,加载未完成的测试任务 def load_tasks(self): - '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>2 + '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>4 #若不是异常停止,正常情况下,任务都应该是没有待办MQ的 ''' datas = app_DBM.get_run_tasks() # @@ -62,27 +73,43 @@ class TaskManager: target_id = data[8] # 创建任务对象 - task = TaskObject(task_target, cookie_info, work_type, llm_type, self.num_threads, self.local_ip,fake_target,target_id,self,safe_rank) - #这里要做区分.临时任务和巡检任务 - #? + task = TaskObject(task_target, cookie_info, work_type, llm_type, self.num_threads, self.local_ip,fake_target,self,target_id,safe_rank) #读取attact_tree---load的任务应该都要有attact_tree attack_tree = g_PKM.ReadData(str(task_id)) if attack_tree: - #恢复数据 + #恢复数据--遍历节点恢复状态 --临时和polling适用 task.init_task(task_id,attack_tree) #开始任务 ---根据task_status来判断是否需要启动工作线程 - if task_status == 1: + if task_status == 1: #执行中的任务 + bcan_start = False + if target_id == 0: #临时任务 if self.cur_task_run_num < self.max_run_task: #load 是程序刚起,只有主线程,不加锁 - bsuc,strout = task.start_task() - if bsuc: - self.cur_task_run_num +=1 + bcan_start = True + else: #巡检任务 + if self.cur_polling_task_num < self.max_polling_task: + bcan_start = True + + if bcan_start: + isuc, strout = task.start_task() + if isuc == 1: #启动工作线程成功 + if target_id == 0: + self.cur_task_run_num += 1 else: - task.update_task_status(0) - else: - self.logger.error("重载未结束任务,不应该超过最大运行数量的task_status为启动状态") - task.update_task_status(0)#尝试硬恢复 + self.cur_polling_task_num += 1 + elif isuc == -1: # 存在未结束的线程--load这边应该不会发生 + self.logger.debug(f"load-task遇到未结束线程的任务--{task_id}") + task.update_task_status(2) + else: # 0 --没有待办事项 + self.logger.debug(f"load-task遇到无待办实现的任务--{task_id}") + task.update_task_status(3) + else: + task.update_task_status(0) #等其他task结束后会启动 + # 内存保留task对象 - self.tasks[task_id] = task + if target_id == 0: + self.tasks[task_id] = task + else: + self.polling_tasks[task_id] = task else: self.logger.error(f"{task_id}任务的节点树数据缺失,需要检查!") @@ -102,7 +129,7 @@ class TaskManager: fail_list.append(target) continue #判断是否已在执行列表 - if self.is_target_in_tasks(target): + if self.is_target_in_tasks(target,1): fail_list.append(target) continue #raise ValueError(f"Task {test_target} already exists") @@ -116,7 +143,7 @@ class TaskManager: #保留task对象 self.tasks[task_id] = task #尝试启动task - self.start_task_TM(task_id) + _,_ = self.start_task_TM(task_id) else: fail_list.append(target) result = ",".join(fail_list) @@ -134,22 +161,46 @@ class TaskManager: else: fake_terget = self.TargetM.get_fake_target(2) #创建任务实例 - polling_task = TaskObject(target, "", self.work_type, self.polling_llmtype, self.num_threads, self.local_ip, fake_target, self) + polling_task = TaskObject(target, "", self.work_type, self.polling_llmtype, self.num_threads, self.local_ip, fake_target, self,target[0]) # 获取task_id -- test_target,cookie_info,work_type,llm_type 入数据库 task_id = app_DBM.start_task(target, "", self.work_type, self.polling_llmtype, fake_target,target[0]) #增加一个target_id if task_id >0: polling_task.init_task(task_id)#初始化任务信息,并提交到待处理节点队列 #保留task对象--巡检任务 self.polling_tasks[task_id] = polling_task + #尝试启动task + _,_ = self.start_polling_task_TM(polling_task) - def th_control_takes(self): - while self.brun: - #遍历临时任务--启停任务 - pass - #遍历巡检任务-启停 - pass - #休眠 - time.sleep(60*5) + def is_can_start_task(self,itype): + if itype ==1: + if self.cur_task_run_num < self.max_run_task: + return True + else: + if self.cur_polling_task_num < self.max_polling_task: + return True + return False + + def start_next_task(self,task_id,target_id): + if target_id == 0: #临时任务 + with self.tasks_lock: + self.cur_task_run_num -= 1 + for task in self.tasks.values(): + if task.task_status == 0: #只有是0状态会自动启动,2暂停状态需要人工启动 + isuc, error = task.start_task() + if isuc == 1: + self.cur_task_run_num +=1 + break + else: #巡检任务--是直接结束置4 + with self.polling_tasks_lock: + del self.polling_tasks[task_id] # 删除缓存 + + self.cur_polling_task_num -=1 + for pl_task in self.polling_tasks: + if pl_task.task_status == 0: + isuc ,error = pl_task.start_task() + if isuc == 1: + self.cur_polling_task_num += 1 + break #开启task任务 def start_task_TM(self,task_id): @@ -158,8 +209,8 @@ class TaskManager: with self.tasks_lock: if self.cur_task_run_num < self.max_run_task: #启动工作线程和进程 - bsuc,error = task.start_task() - if bsuc: + isuc,error = task.start_task() + if isuc ==1: self.cur_task_run_num += 1 return True,"启动成功" else: @@ -168,7 +219,20 @@ class TaskManager: return False,f"已到达最大的启动数--{self.max_run_task}" return False,"该任务不存在,程序逻辑存在问题!" - #停止task任务--暂停 + def start_polling_task_TM(self,pl_task): + if pl_task: + with self.polling_tasks_lock: + if self.cur_polling_task_num < self.max_polling_task: + isuc,error = pl_task.start_task() + if isuc == 1: + self.cur_polling_task_num += 1 + return True,"启动成功" + else: + return False, error + else: + return False, f"已到达最大的启动数--{self.max_run_task}" + + #停止task任务--暂停---要启动下一个task---只针对临时任务 def stop_task_TM(self,task_id): task = self.tasks[task_id] if task: @@ -176,25 +240,33 @@ class TaskManager: with self.tasks_lock: task.stop_task() #停止线程应该就没什么失败需要处理的 self.cur_task_run_num -= 1 + #启动下一个任务-只针对临时任务 + for tmptask in self.tasks.values(): + if tmptask.task_status == 0: # 只有是0状态会自动启动,2暂停状态需要人工启动 + isuc, error = tmptask.start_task() + if isuc == 1: + self.cur_task_run_num += 1 + break + return True,"停止任务成功" else: return True,"该任务已处于停止状态" return False,"该任务不存在,程序逻辑存在问题!" - #结束任务 + #结束任务-只针对临时任务 def over_task(self,task_id): #先尝试停止 bsuccess,_ = self.stop_task_TM(task_id) time.sleep(1) if bsuccess: #再结束 - bsuccess = app_DBM.over_task(task_id) # 不管是不是修改(置2)成功,都执行结束 + bsuccess = app_DBM.over_task(task_id) # 不管是不是修改(置4)成功,都执行结束 del self.tasks[task_id] # 删除缓存 return True,"结束任务成功" else: return False,"该任务不存在,程序逻辑存在问题!" - #删除任务 + #删除任务--历史任务处理 def del_task(self,task_id): #删除数据记录 app_DBM.del_task(task_id) @@ -206,7 +278,7 @@ class TaskManager: def control_taks(self,task_id): task = self.tasks[task_id] if task: - if task.task_status == 0: # 0-暂停,1-执行中,2-已完成 + if task.task_status in (0,2): # 0-未启动,1-执行中,2-暂停中,3-已完成,4-已结束 bsuc,error = self.start_task_TM(task_id) #任务是否存在的状态有点重复 elif task.task_status == 1: bsuc,error = self.stop_task_TM(task_id) @@ -243,11 +315,11 @@ class TaskManager: return tree_dict return None - #修改任务的工作模式,只有在暂停状态才能修改 + #修改任务的工作模式,只有在未启动或暂停状态才能修改 def update_task_work_type(self,task_id,new_work_type): task = self.tasks[task_id] if task: - if task.task_status == 0: + if task.task_status in (0,2): task.update_task_work_type(new_work_type) return True return False diff --git a/mycode/TaskObject.py b/mycode/TaskObject.py index a58e686..f57bd5d 100644 --- a/mycode/TaskObject.py +++ b/mycode/TaskObject.py @@ -13,7 +13,7 @@ from myutils.PickleManager import g_PKM from myutils.ConfigManager import myCongif from mycode.WebSocketManager import g_WSM from mycode.CommandVerify import g_CV -from mycode.PythonTManager import PythonTManager + from mycode.DataFilterManager import DataFilterManager from myutils.ReadWriteLock import ReadWriteLock import asyncio @@ -32,7 +32,7 @@ class TaskObject: self.taskM = taskM self.logger = LogHandler().get_logger("TaskObject") self.InstrM = g_instrM # 类对象渗透,要约束只读取信息,且不允许有类全局对象--持续检查 - self.PythonM = PythonTManager(myCongif.get_data("Python_max_procs")) + self.PythonM = taskM.PythonM #python进程池往上提一层 self.DataFilter = DataFilterManager(test_target,fake_target) self.CCM = ControlCenter() #一个任务一个CCM self.LLM = LLMManager(llm_type) # LLM对象调整为一个任务一个对象,这样可以为不同的任务选择不同的LLM @@ -46,7 +46,7 @@ class TaskObject: self.cookie = cookie_info self.work_type = work_type #工作模式 0-人工,1-自动 self.task_id = None - self.task_status = 0 #0-暂停,1-执行中,2-已完成 3-未启动,2025-4-27为配合批量添加任务增加“未启动”状态 + self.task_status = 0 #0-初始状态,1-执行中,2-暂停中,3-已完成,4-已结束 self.local_ip = local_ip self.attack_tree = None #任务节点树 self.attack_tree_lock = None #把节点的Lock外置 @@ -385,7 +385,9 @@ class TaskObject: #判断任务是否完成 bover = self.is_over() if bover and not bworking: - self.stop_task() #自己停自己--确保没有阻塞操作 + self.over_task() #自己停自己--确保没有阻塞操作 + # 通知TM 开始下一个任务 + self.taskM.start_next_task(self.task_id,self.target_id) break #退出循环--结束线程 #处理点修复操作 icount +=1 @@ -614,8 +616,7 @@ class TaskObject: if b_putwork: return True,"已提交单步任务" else: #所有节点都没有提交任务 - #可以尝试stop下 - self.stop_task() + self.over_task() return False,"该任务已经没有待提交任务" else: return False,"当前任务正在执行任务中,不需要提交单步任务!" @@ -993,7 +994,7 @@ class TaskObject: #self.LLM.build_initial_prompt(root_node) # 对根节点初始化system-msg self.LLM.build_init_info_prompt(root_node) #新建任务先开始信息收集工作 2025-5-15 know_info = "" - self.put_node_reslist(root_node,know_info,-1) #新增一个-1的状态值 + self.put_node_reslist(root_node,know_info,-1) #llm_type新增一个-1的状态值 # 初始保存个attack_tree文件 g_PKM.WriteData(self.attack_tree, str(self.task_id)) @@ -1017,10 +1018,7 @@ class TaskObject: if self.check_th.is_alive(): self.logger.debug(f"{self.task_id}有存活自检线程") return False - #工作子进程池 - if self.PythonM.is_pool_active(): - self.logger.debug(f"{self.task_id}有存活子进程池") - return False + return True def update_task_work_type(self,new_work_type): @@ -1036,11 +1034,11 @@ class TaskObject: def start_task(self): ''' 启动该测试任务, - :return bool str + :return int str ''' if self.is_task_stop(): if self.is_over(): #只是判断了待办list,在执行的会漏判断 - return False, "该任务所有未暂停节点的待办事项都已经结束" + return 0, "该任务所有未暂停节点的待办事项都已经结束" #更新状态标识 self.update_task_status(1) self.brun = True #线程正常启动 @@ -1057,22 +1055,32 @@ class TaskObject: #启动自检线程 self.check_th = threading.Thread(target=self.th_check) self.check_th.start() - #启动python子进程池 --进程池的启动,目前是没全面确认原子进程池的状态,直接None - self.PythonM.start_pool() - return True,"启动任务成功!" #就算启动了一部分线程,也要认为是成功 + + return 1,"启动任务成功!" #就算启动了一部分线程,也要认为是成功 else: - return False,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!" + return -1,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!" - def stop_task(self): #还未处理 + def stop_task(self): #--暂停task if self.brun and self.task_status == 1: #不是正常工作状态就不执行停止工作了 - #停止子进程池 - self.PythonM.shutdown_pool() #停止线程 self.brun = False - self.update_task_status(0) #置状态0 - # 结束任务需要收尾处理#? + if self.target_id == 0: + self.update_task_status(2) #置状态2--暂停状态 + else: + self.logger.debug("stop_task不应该处理到巡检任务!") + + def over_task(self): #结束任务 + if self.brun and self.task_status ==1: + # 停止线程 + self.brun = False + if self.target_id == 0: + self.update_task_status(3) # 置状态3--完成状态 + else: + self.update_task_status(4) #巡检任务直接结束 + # 结束任务需要收尾处理#? self.InstrM.init_data() #pass + def is_over(self): ''' 判断当前任务是否所有节点都已结束 diff --git a/run.py b/run.py index a1eb63c..e1390a1 100644 --- a/run.py +++ b/run.py @@ -33,5 +33,5 @@ if __name__ == '__main__': asyncio.run(run_quart_app()) #启动巡检线程 PM = PollingManager() - PM.run_polling() + #PM.run_polling()