diff --git a/.idea/misc.xml b/.idea/misc.xml
index f22b61c..b7f37e1 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,4 @@
-
+
\ No newline at end of file
diff --git a/.idea/zf_safe.iml b/.idea/zf_safe.iml
index b279356..d1e770f 100644
--- a/.idea/zf_safe.iml
+++ b/.idea/zf_safe.iml
@@ -2,7 +2,7 @@
-
+
\ No newline at end of file
diff --git a/config.yaml b/config.yaml
index bc7f249..ef96474 100644
--- a/config.yaml
+++ b/config.yaml
@@ -2,6 +2,7 @@
App_Work_type: 0 #0-开发模式,只允许单步模式 1-生产模式 包裹下的逻辑可以正常执行
max_run_task: 3 #允许同时运行的任务数
max_polling_task: 3 #运行同时运行的巡检任务
+Python_max_procs: 10 #进程池最大上限10个
#线程休眠的时间
sleep_time: 20
@@ -39,5 +40,4 @@ TreeFile: tree_data/attack_tree
#task
Task_max_threads: 5
-Python_max_procs: 3
LLM_max_threads: 1
diff --git a/mycode/DBManager.py b/mycode/DBManager.py
index 44f660c..dde3a56 100644
--- a/mycode/DBManager.py
+++ b/mycode/DBManager.py
@@ -219,7 +219,7 @@ SELECT COLUMN_NAME, CHARACTER_MAXIMUM_LENGTH
return data
def get_run_tasks(self):
- strsql = "select ID,task_target,task_status,work_type,cookie_info,llm_type,safe_rank,fake_target,target_id from task where task_status <> 2 order by ID;"
+ strsql = "select ID,task_target,task_status,work_type,cookie_info,llm_type,safe_rank,fake_target,target_id from task where task_status <> 4 order by ID;"
datas = self.do_select(strsql)
return datas
@@ -240,7 +240,7 @@ SELECT COLUMN_NAME, CHARACTER_MAXIMUM_LENGTH
def over_task(self,task_id):
over_time = get_local_timestr()
- strsql = "update task set task_status=2,end_time=%s where ID=%s;"
+ strsql = "update task set task_status=4,end_time=%s where ID=%s;"
params = (over_time,task_id)
bok,_ = self.safe_do_sql(strsql, params)
return bok
@@ -409,11 +409,11 @@ SELECT COLUMN_NAME, CHARACTER_MAXIMUM_LENGTH
def get_his_tasks(self,target_name,safe_rank,llm_type,start_time,end_time):
strsql = "select ID,task_target,safe_rank,llm_type,start_time,end_time from task"
conditions = ["task_status=%s"]
- params = [2]
+ params = [4]
# 按需添加其他条件
if target_name and target_name.strip(): # 检查nodename是否非空(去除前后空格后)
- conditions.append("task_target=%s")
- params.append(target_name)
+ conditions.append("task_target like %s")
+ params.append(f"%{target_name}%")
if safe_rank and safe_rank.strip(): # 检查vultype是否非空
conditions.append("safe_rank=%s")
diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py
index 0f15368..431afeb 100644
--- a/mycode/LLMManager.py
+++ b/mycode/LLMManager.py
@@ -123,7 +123,7 @@ mysql -u root -p 192.168.1.100
**测试指令格式**
- dash指令:{\"action\":\"dash\",\"path\":\"节点路径\",\"content\":\"指令内容\"}
- python指令:{\"action\":\"python\",\"path\":\"节点路径\",\"content\":\"指令内容\"}
- * python主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(status, output)
+ * python主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(bool, output)
- [节点路径]为从根节点到目标节点的完整层级路径,以"->"关联,如:目标系统->192.168.1.100
**JSON结果格式**
- JSON结果:{\"action\":\"asset\",\"URL\":{URL信息json},\"IPS\":[\"IP\":\"192.168.1.100\",\"IPtype\":\"IPv4/IPv6\",\"Ports\":[端口信息json]]}
diff --git a/mycode/PollingManager.py b/mycode/PollingManager.py
index 221d34c..b2338c3 100644
--- a/mycode/PollingManager.py
+++ b/mycode/PollingManager.py
@@ -102,7 +102,7 @@ class PollingManager:
def do_check(self,target):
print(f"巡检中: {target[2]} (类型: {target[9]})")
- #创建task---并初始化待办
+ #创建task---并初始化待办 ----尝试执行
g_TaskM.create_polling_task(target)
diff --git a/mycode/PythonTManager.py b/mycode/PythonTManager.py
index 68a0620..33cfcf7 100644
--- a/mycode/PythonTManager.py
+++ b/mycode/PythonTManager.py
@@ -28,7 +28,6 @@ class PythonTManager:
def is_pool_active(self):
return self.python_tool.pool_active
-
def execute_instruction(self,instruction):
bwork = False
while self.brun:
diff --git a/mycode/PythoncodeTool.py b/mycode/PythoncodeTool.py
index cdd2c63..e253180 100644
--- a/mycode/PythoncodeTool.py
+++ b/mycode/PythoncodeTool.py
@@ -163,21 +163,12 @@ class PythoncodeTool():
if self.proc_pool is not None and self.pool_active:
try:
print("关闭进程池...")
- pool = self.proc_pool
+ #2025-5-28进程池上移后,可以直接阻塞等待完成了。
+ # 这里是真正阻塞等待队列管理和子进程退出
+ self.proc_pool.shutdown(wait=True)
+ print("进程池已完全关闭。")
self.pool_active = False
self.proc_pool = None
-
- def _shutdown_background():
- try:
- # 这里是真正阻塞等待队列管理和子进程退出
- pool.shutdown(wait=True)
- print("进程池已完全关闭。")
- except Exception as e:
- print(f"后台关闭进程池时出错: {e}")
-
- # 启动一个守护线程来做真正的 shutdown(wait=True)
- t = threading.Thread(target=_shutdown_background, daemon=True)
- t.start()
except Exception as e:
print(f"子进程关闭异常。。{e}")
else:
diff --git a/mycode/TaskManager.py b/mycode/TaskManager.py
index 007043d..5e34dee 100644
--- a/mycode/TaskManager.py
+++ b/mycode/TaskManager.py
@@ -4,6 +4,7 @@ from mycode.DBManager import app_DBM
from myutils.PickleManager import g_PKM
from myutils.MyLogger_logger import LogHandler
from mycode.TargetManager import TargetManager # 从模块导入类
+from mycode.PythonTManager import PythonTManager
import time
import threading
@@ -16,20 +17,30 @@ class TaskManager:
data = app_DBM.get_system_info()
self.local_ip = data[0]
self.version = data[1]
- self.tasks_lock = threading.Lock()
+
self.brun = True
self.tm_th = None #任务控制线程
#----临时任务相关-------
self.web_cur_task = 0 #web端当前显示的 -- web端可以操作的都为临时任务
+ self.tasks_lock = threading.Lock()
self.tasks = {} # 执行中的任务,test_id为key
self.max_run_task = myCongif.get_data("max_run_task")
self.cur_task_run_num = 0
#-----巡检任务相关-------
self.polling_llmtype = myCongif.get_data("polling_llm_type")
+ self.polling_tasks_lock = threading.Lock()
self.polling_tasks = {} #独立一个队列
self.max_polling_task = myCongif.get_data("max_polling_task")
- self.cur_polling_task = 0
+ self.cur_polling_task_num = 0
self.work_type = 1 #巡检工作模式就是自动了
+ #python 进程池 提到程序主线程层面 --不随任务进行启停
+ self.PythonM = PythonTManager(myCongif.get_data("Python_max_procs"))
+ # 启动python子进程池 --进程池的启动,目前是没全面确认原子进程池的状态,直接None
+ self.PythonM.start_pool() #开启进程池
+
+ def __del__(self):
+ # 停止子进程池
+ self.PythonM.shutdown_pool()
#判断目标是不是在当前执行任务中,---没加锁,最多跟删除有冲突,问题应该不大
def is_target_in_tasks(self,task_target,itype = 1):
@@ -46,7 +57,7 @@ class TaskManager:
#程序启动后,加载未完成的测试任务
def load_tasks(self):
- '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>2
+ '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>4
#若不是异常停止,正常情况下,任务都应该是没有待办MQ的
'''
datas = app_DBM.get_run_tasks() #
@@ -62,27 +73,43 @@ class TaskManager:
target_id = data[8]
# 创建任务对象
- task = TaskObject(task_target, cookie_info, work_type, llm_type, self.num_threads, self.local_ip,fake_target,target_id,self,safe_rank)
- #这里要做区分.临时任务和巡检任务
- #?
+ task = TaskObject(task_target, cookie_info, work_type, llm_type, self.num_threads, self.local_ip,fake_target,self,target_id,safe_rank)
#读取attact_tree---load的任务应该都要有attact_tree
attack_tree = g_PKM.ReadData(str(task_id))
if attack_tree:
- #恢复数据
+ #恢复数据--遍历节点恢复状态 --临时和polling适用
task.init_task(task_id,attack_tree)
#开始任务 ---根据task_status来判断是否需要启动工作线程
- if task_status == 1:
+ if task_status == 1: #执行中的任务
+ bcan_start = False
+ if target_id == 0: #临时任务
if self.cur_task_run_num < self.max_run_task: #load 是程序刚起,只有主线程,不加锁
- bsuc,strout = task.start_task()
- if bsuc:
- self.cur_task_run_num +=1
+ bcan_start = True
+ else: #巡检任务
+ if self.cur_polling_task_num < self.max_polling_task:
+ bcan_start = True
+
+ if bcan_start:
+ isuc, strout = task.start_task()
+ if isuc == 1: #启动工作线程成功
+ if target_id == 0:
+ self.cur_task_run_num += 1
else:
- task.update_task_status(0)
- else:
- self.logger.error("重载未结束任务,不应该超过最大运行数量的task_status为启动状态")
- task.update_task_status(0)#尝试硬恢复
+ self.cur_polling_task_num += 1
+ elif isuc == -1: # 存在未结束的线程--load这边应该不会发生
+ self.logger.debug(f"load-task遇到未结束线程的任务--{task_id}")
+ task.update_task_status(2)
+ else: # 0 --没有待办事项
+ self.logger.debug(f"load-task遇到无待办实现的任务--{task_id}")
+ task.update_task_status(3)
+ else:
+ task.update_task_status(0) #等其他task结束后会启动
+
# 内存保留task对象
- self.tasks[task_id] = task
+ if target_id == 0:
+ self.tasks[task_id] = task
+ else:
+ self.polling_tasks[task_id] = task
else:
self.logger.error(f"{task_id}任务的节点树数据缺失,需要检查!")
@@ -102,7 +129,7 @@ class TaskManager:
fail_list.append(target)
continue
#判断是否已在执行列表
- if self.is_target_in_tasks(target):
+ if self.is_target_in_tasks(target,1):
fail_list.append(target)
continue
#raise ValueError(f"Task {test_target} already exists")
@@ -116,7 +143,7 @@ class TaskManager:
#保留task对象
self.tasks[task_id] = task
#尝试启动task
- self.start_task_TM(task_id)
+ _,_ = self.start_task_TM(task_id)
else:
fail_list.append(target)
result = ",".join(fail_list)
@@ -134,22 +161,46 @@ class TaskManager:
else:
fake_terget = self.TargetM.get_fake_target(2)
#创建任务实例
- polling_task = TaskObject(target, "", self.work_type, self.polling_llmtype, self.num_threads, self.local_ip, fake_target, self)
+ polling_task = TaskObject(target, "", self.work_type, self.polling_llmtype, self.num_threads, self.local_ip, fake_target, self,target[0])
# 获取task_id -- test_target,cookie_info,work_type,llm_type 入数据库
task_id = app_DBM.start_task(target, "", self.work_type, self.polling_llmtype, fake_target,target[0]) #增加一个target_id
if task_id >0:
polling_task.init_task(task_id)#初始化任务信息,并提交到待处理节点队列
#保留task对象--巡检任务
self.polling_tasks[task_id] = polling_task
+ #尝试启动task
+ _,_ = self.start_polling_task_TM(polling_task)
- def th_control_takes(self):
- while self.brun:
- #遍历临时任务--启停任务
- pass
- #遍历巡检任务-启停
- pass
- #休眠
- time.sleep(60*5)
+ def is_can_start_task(self,itype):
+ if itype ==1:
+ if self.cur_task_run_num < self.max_run_task:
+ return True
+ else:
+ if self.cur_polling_task_num < self.max_polling_task:
+ return True
+ return False
+
+ def start_next_task(self,task_id,target_id):
+ if target_id == 0: #临时任务
+ with self.tasks_lock:
+ self.cur_task_run_num -= 1
+ for task in self.tasks.values():
+ if task.task_status == 0: #只有是0状态会自动启动,2暂停状态需要人工启动
+ isuc, error = task.start_task()
+ if isuc == 1:
+ self.cur_task_run_num +=1
+ break
+ else: #巡检任务--是直接结束置4
+ with self.polling_tasks_lock:
+ del self.polling_tasks[task_id] # 删除缓存
+
+ self.cur_polling_task_num -=1
+ for pl_task in self.polling_tasks:
+ if pl_task.task_status == 0:
+ isuc ,error = pl_task.start_task()
+ if isuc == 1:
+ self.cur_polling_task_num += 1
+ break
#开启task任务
def start_task_TM(self,task_id):
@@ -158,8 +209,8 @@ class TaskManager:
with self.tasks_lock:
if self.cur_task_run_num < self.max_run_task:
#启动工作线程和进程
- bsuc,error = task.start_task()
- if bsuc:
+ isuc,error = task.start_task()
+ if isuc ==1:
self.cur_task_run_num += 1
return True,"启动成功"
else:
@@ -168,7 +219,20 @@ class TaskManager:
return False,f"已到达最大的启动数--{self.max_run_task}"
return False,"该任务不存在,程序逻辑存在问题!"
- #停止task任务--暂停
+ def start_polling_task_TM(self,pl_task):
+ if pl_task:
+ with self.polling_tasks_lock:
+ if self.cur_polling_task_num < self.max_polling_task:
+ isuc,error = pl_task.start_task()
+ if isuc == 1:
+ self.cur_polling_task_num += 1
+ return True,"启动成功"
+ else:
+ return False, error
+ else:
+ return False, f"已到达最大的启动数--{self.max_run_task}"
+
+ #停止task任务--暂停---要启动下一个task---只针对临时任务
def stop_task_TM(self,task_id):
task = self.tasks[task_id]
if task:
@@ -176,25 +240,33 @@ class TaskManager:
with self.tasks_lock:
task.stop_task() #停止线程应该就没什么失败需要处理的
self.cur_task_run_num -= 1
+ #启动下一个任务-只针对临时任务
+ for tmptask in self.tasks.values():
+ if tmptask.task_status == 0: # 只有是0状态会自动启动,2暂停状态需要人工启动
+ isuc, error = tmptask.start_task()
+ if isuc == 1:
+ self.cur_task_run_num += 1
+ break
+
return True,"停止任务成功"
else:
return True,"该任务已处于停止状态"
return False,"该任务不存在,程序逻辑存在问题!"
- #结束任务
+ #结束任务-只针对临时任务
def over_task(self,task_id):
#先尝试停止
bsuccess,_ = self.stop_task_TM(task_id)
time.sleep(1)
if bsuccess:
#再结束
- bsuccess = app_DBM.over_task(task_id) # 不管是不是修改(置2)成功,都执行结束
+ bsuccess = app_DBM.over_task(task_id) # 不管是不是修改(置4)成功,都执行结束
del self.tasks[task_id] # 删除缓存
return True,"结束任务成功"
else:
return False,"该任务不存在,程序逻辑存在问题!"
- #删除任务
+ #删除任务--历史任务处理
def del_task(self,task_id):
#删除数据记录
app_DBM.del_task(task_id)
@@ -206,7 +278,7 @@ class TaskManager:
def control_taks(self,task_id):
task = self.tasks[task_id]
if task:
- if task.task_status == 0: # 0-暂停,1-执行中,2-已完成
+ if task.task_status in (0,2): # 0-未启动,1-执行中,2-暂停中,3-已完成,4-已结束
bsuc,error = self.start_task_TM(task_id) #任务是否存在的状态有点重复
elif task.task_status == 1:
bsuc,error = self.stop_task_TM(task_id)
@@ -243,11 +315,11 @@ class TaskManager:
return tree_dict
return None
- #修改任务的工作模式,只有在暂停状态才能修改
+ #修改任务的工作模式,只有在未启动或暂停状态才能修改
def update_task_work_type(self,task_id,new_work_type):
task = self.tasks[task_id]
if task:
- if task.task_status == 0:
+ if task.task_status in (0,2):
task.update_task_work_type(new_work_type)
return True
return False
diff --git a/mycode/TaskObject.py b/mycode/TaskObject.py
index a58e686..f57bd5d 100644
--- a/mycode/TaskObject.py
+++ b/mycode/TaskObject.py
@@ -13,7 +13,7 @@ from myutils.PickleManager import g_PKM
from myutils.ConfigManager import myCongif
from mycode.WebSocketManager import g_WSM
from mycode.CommandVerify import g_CV
-from mycode.PythonTManager import PythonTManager
+
from mycode.DataFilterManager import DataFilterManager
from myutils.ReadWriteLock import ReadWriteLock
import asyncio
@@ -32,7 +32,7 @@ class TaskObject:
self.taskM = taskM
self.logger = LogHandler().get_logger("TaskObject")
self.InstrM = g_instrM # 类对象渗透,要约束只读取信息,且不允许有类全局对象--持续检查
- self.PythonM = PythonTManager(myCongif.get_data("Python_max_procs"))
+ self.PythonM = taskM.PythonM #python进程池往上提一层
self.DataFilter = DataFilterManager(test_target,fake_target)
self.CCM = ControlCenter() #一个任务一个CCM
self.LLM = LLMManager(llm_type) # LLM对象调整为一个任务一个对象,这样可以为不同的任务选择不同的LLM
@@ -46,7 +46,7 @@ class TaskObject:
self.cookie = cookie_info
self.work_type = work_type #工作模式 0-人工,1-自动
self.task_id = None
- self.task_status = 0 #0-暂停,1-执行中,2-已完成 3-未启动,2025-4-27为配合批量添加任务增加“未启动”状态
+ self.task_status = 0 #0-初始状态,1-执行中,2-暂停中,3-已完成,4-已结束
self.local_ip = local_ip
self.attack_tree = None #任务节点树
self.attack_tree_lock = None #把节点的Lock外置
@@ -385,7 +385,9 @@ class TaskObject:
#判断任务是否完成
bover = self.is_over()
if bover and not bworking:
- self.stop_task() #自己停自己--确保没有阻塞操作
+ self.over_task() #自己停自己--确保没有阻塞操作
+ # 通知TM 开始下一个任务
+ self.taskM.start_next_task(self.task_id,self.target_id)
break #退出循环--结束线程
#处理点修复操作
icount +=1
@@ -614,8 +616,7 @@ class TaskObject:
if b_putwork:
return True,"已提交单步任务"
else: #所有节点都没有提交任务
- #可以尝试stop下
- self.stop_task()
+ self.over_task()
return False,"该任务已经没有待提交任务"
else:
return False,"当前任务正在执行任务中,不需要提交单步任务!"
@@ -993,7 +994,7 @@ class TaskObject:
#self.LLM.build_initial_prompt(root_node) # 对根节点初始化system-msg
self.LLM.build_init_info_prompt(root_node) #新建任务先开始信息收集工作 2025-5-15
know_info = ""
- self.put_node_reslist(root_node,know_info,-1) #新增一个-1的状态值
+ self.put_node_reslist(root_node,know_info,-1) #llm_type新增一个-1的状态值
# 初始保存个attack_tree文件
g_PKM.WriteData(self.attack_tree, str(self.task_id))
@@ -1017,10 +1018,7 @@ class TaskObject:
if self.check_th.is_alive():
self.logger.debug(f"{self.task_id}有存活自检线程")
return False
- #工作子进程池
- if self.PythonM.is_pool_active():
- self.logger.debug(f"{self.task_id}有存活子进程池")
- return False
+
return True
def update_task_work_type(self,new_work_type):
@@ -1036,11 +1034,11 @@ class TaskObject:
def start_task(self):
'''
启动该测试任务,
- :return bool str
+ :return int str
'''
if self.is_task_stop():
if self.is_over(): #只是判断了待办list,在执行的会漏判断
- return False, "该任务所有未暂停节点的待办事项都已经结束"
+ return 0, "该任务所有未暂停节点的待办事项都已经结束"
#更新状态标识
self.update_task_status(1)
self.brun = True #线程正常启动
@@ -1057,22 +1055,32 @@ class TaskObject:
#启动自检线程
self.check_th = threading.Thread(target=self.th_check)
self.check_th.start()
- #启动python子进程池 --进程池的启动,目前是没全面确认原子进程池的状态,直接None
- self.PythonM.start_pool()
- return True,"启动任务成功!" #就算启动了一部分线程,也要认为是成功
+
+ return 1,"启动任务成功!" #就算启动了一部分线程,也要认为是成功
else:
- return False,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!"
+ return -1,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!"
- def stop_task(self): #还未处理
+ def stop_task(self): #--暂停task
if self.brun and self.task_status == 1: #不是正常工作状态就不执行停止工作了
- #停止子进程池
- self.PythonM.shutdown_pool()
#停止线程
self.brun = False
- self.update_task_status(0) #置状态0
- # 结束任务需要收尾处理#?
+ if self.target_id == 0:
+ self.update_task_status(2) #置状态2--暂停状态
+ else:
+ self.logger.debug("stop_task不应该处理到巡检任务!")
+
+ def over_task(self): #结束任务
+ if self.brun and self.task_status ==1:
+ # 停止线程
+ self.brun = False
+ if self.target_id == 0:
+ self.update_task_status(3) # 置状态3--完成状态
+ else:
+ self.update_task_status(4) #巡检任务直接结束
+ # 结束任务需要收尾处理#?
self.InstrM.init_data() #pass
+
def is_over(self):
'''
判断当前任务是否所有节点都已结束
diff --git a/run.py b/run.py
index a1eb63c..e1390a1 100644
--- a/run.py
+++ b/run.py
@@ -33,5 +33,5 @@ if __name__ == '__main__':
asyncio.run(run_quart_app())
#启动巡检线程
PM = PollingManager()
- PM.run_polling()
+ #PM.run_polling()