diff --git a/config.yaml b/config.yaml index 5c57217..bc7f249 100644 --- a/config.yaml +++ b/config.yaml @@ -21,6 +21,7 @@ mysql: database: zfsafe #LLM +polling_llm_type: 1 #0-腾讯云,1-DS,2-2233.ai,3-GPT LLM_max_chain_count: 10 #为了避免推理链过长,造成推理效果变差,应该控制一个推理链的长度上限 #Node diff --git a/mycode/DBManager.py b/mycode/DBManager.py index 1748fde..44f660c 100644 --- a/mycode/DBManager.py +++ b/mycode/DBManager.py @@ -219,11 +219,11 @@ SELECT COLUMN_NAME, CHARACTER_MAXIMUM_LENGTH return data def get_run_tasks(self): - strsql = "select ID,task_target,task_status,work_type,cookie_info,llm_type,safe_rank,fake_target from task where task_status <> 2 order by ID;" + strsql = "select ID,task_target,task_status,work_type,cookie_info,llm_type,safe_rank,fake_target,target_id from task where task_status <> 2 order by ID;" datas = self.do_select(strsql) return datas - def start_task(self,test_target,cookie_info,work_type,llm_type,fake_target) -> int: + def start_task(self,test_target,cookie_info,work_type,llm_type,fake_target,target_id=0) -> int: ''' 数据库添加检测任务 :param task_name: @@ -232,9 +232,9 @@ SELECT COLUMN_NAME, CHARACTER_MAXIMUM_LENGTH ''' task_id =0 start_time = get_local_timestr() - sql = "INSERT INTO task (task_name,task_target,start_time,task_status,safe_rank,work_type,cookie_info,llm_type,fake_target) " \ - "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)" - params = (test_target,test_target,start_time,1,0,work_type,cookie_info,llm_type,fake_target) + sql = "INSERT INTO task (task_name,task_target,start_time,task_status,safe_rank,work_type,cookie_info,llm_type,fake_target,target_id) " \ + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)" + params = (test_target,test_target,start_time,1,0,work_type,cookie_info,llm_type,fake_target,target_id) bok,task_id = self.safe_do_sql(sql,params,1) return task_id @@ -899,6 +899,12 @@ LEFT JOIN ( datas = self.safe_do_select(strsql, tuple(params)) return datas + def update_polling_last_time(self,target_ID): + str_time = get_local_timestr() + strsql = "update target set polling_last_time = %s where ID=%s;" + params = (str_time,target_ID) + bok,_ = self.safe_do_sql(strsql,params) + return bok diff --git a/mycode/PollingManager.py b/mycode/PollingManager.py index f297d23..221d34c 100644 --- a/mycode/PollingManager.py +++ b/mycode/PollingManager.py @@ -102,9 +102,10 @@ class PollingManager: def do_check(self,target): print(f"巡检中: {target[2]} (类型: {target[9]})") - #创建task + #创建task---并初始化待办 g_TaskM.create_polling_task(target) + def update_last_time(self, target_id): now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') strsql = "UPDATE target SET polling_last_time=%s WHERE ID=%s;" diff --git a/mycode/TargetManager.py b/mycode/TargetManager.py index 57d6170..2254d4e 100644 --- a/mycode/TargetManager.py +++ b/mycode/TargetManager.py @@ -10,6 +10,7 @@ import requests import whois import dns.resolver import ssl +import random from urllib.parse import urlparse from datetime import datetime @@ -51,6 +52,14 @@ class TargetManager: except ValueError: continue + def get_fake_target(self,type): + if type ==1: + fake_target = "192.168.3." + str(random.randint(2, 254)) + else: + fake_target = "czzfkj" + chr(random.randint(97, 122)) + chr(random.randint(97, 122)) + return fake_target + + #验证目标格式的合法性,并提取域名或IP def validate_and_extract(self,input_str): ''' @@ -66,14 +75,12 @@ class TargetManager: if target_type =="IPv4" or target_type=="IPv6": type = 1 #IP - real_target = target - fake_target = "192.168.3.107" elif target_type == "URL": type = 2 #domain - real_target = target - fake_target = "czzfkjxx" else: #目标不合法 return False,real_target,type,fake_target + real_target = target + fake_target = self.get_fake_target(type) return True,real_target,type,fake_target #验证目标是否合法 diff --git a/mycode/TaskManager.py b/mycode/TaskManager.py index c146ec0..007043d 100644 --- a/mycode/TaskManager.py +++ b/mycode/TaskManager.py @@ -11,22 +11,37 @@ class TaskManager: def __init__(self): self.logger = LogHandler().get_logger("TaskManager") self.TargetM = TargetManager() - self.tasks = {} # 执行中的任务,test_id为key - self.num_threads = myCongif.get_data("Task_max_threads") - self.max_run_task = myCongif.get_data("max_run_task") - self.cur_task_run_num = 0 #获取系统信息 -- 用户可修改的都放在DB中,不修改的放config + self.num_threads = myCongif.get_data("Task_max_threads") data = app_DBM.get_system_info() self.local_ip = data[0] self.version = data[1] self.tasks_lock = threading.Lock() - self.web_cur_task = 0 #web端当前显示的 + self.brun = True + self.tm_th = None #任务控制线程 + #----临时任务相关------- + self.web_cur_task = 0 #web端当前显示的 -- web端可以操作的都为临时任务 + self.tasks = {} # 执行中的任务,test_id为key + self.max_run_task = myCongif.get_data("max_run_task") + self.cur_task_run_num = 0 + #-----巡检任务相关------- + self.polling_llmtype = myCongif.get_data("polling_llm_type") + self.polling_tasks = {} #独立一个队列 + self.max_polling_task = myCongif.get_data("max_polling_task") + self.cur_polling_task = 0 + self.work_type = 1 #巡检工作模式就是自动了 #判断目标是不是在当前执行任务中,---没加锁,最多跟删除有冲突,问题应该不大 - def is_target_in_tasks(self,task_target): - for task in self.tasks.values(): - if task_target == task.target: - return True + def is_target_in_tasks(self,task_target,itype = 1): + '''itype:1--临时任务,2--巡检任务''' + if itype ==1: + for task in self.tasks.values(): + if task_target == task.target: + return True + else: + for p_task in self.polling_tasks.values(): + if task_target == p_task.target: + return True return False #程序启动后,加载未完成的测试任务 @@ -34,7 +49,7 @@ class TaskManager: '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>2 #若不是异常停止,正常情况下,任务都应该是没有待办MQ的 ''' - datas = app_DBM.get_run_tasks() + datas = app_DBM.get_run_tasks() # for data in datas: task_id = data[0] task_target = data[1] @@ -44,8 +59,12 @@ class TaskManager: llm_type = data[5] safe_rank = data[6] fake_target = data[7] + target_id = data[8] + # 创建任务对象 - task = TaskObject(task_target, cookie_info, work_type, llm_type, self.num_threads, self.local_ip,fake_target,self,safe_rank) + task = TaskObject(task_target, cookie_info, work_type, llm_type, self.num_threads, self.local_ip,fake_target,target_id,self,safe_rank) + #这里要做区分.临时任务和巡检任务 + #? #读取attact_tree---load的任务应该都要有attact_tree attack_tree = g_PKM.ReadData(str(task_id)) if attack_tree: @@ -96,15 +115,43 @@ class TaskManager: task.init_task(task_id) #保留task对象 self.tasks[task_id] = task + #尝试启动task + self.start_task_TM(task_id) else: fail_list.append(target) result = ",".join(fail_list) return result def create_polling_task(self,target): - #创建巡检任务 + #创建巡检任务 -- 使用check_target + task_target = target[2] + if self.is_target_in_tasks(task_target,2): #巡检任务一个周期还没有执行完。 + #更新检查时间,本次巡检轮次不执行工作 + bok = app_DBM.update_polling_last_time(target[0]) + else:#创建任务 + if target[9] == "IPv4" or target[9] == "IPv6": + fake_target = self.TargetM.get_fake_target(1) + else: + fake_terget = self.TargetM.get_fake_target(2) + #创建任务实例 + polling_task = TaskObject(target, "", self.work_type, self.polling_llmtype, self.num_threads, self.local_ip, fake_target, self) + # 获取task_id -- test_target,cookie_info,work_type,llm_type 入数据库 + task_id = app_DBM.start_task(target, "", self.work_type, self.polling_llmtype, fake_target,target[0]) #增加一个target_id + if task_id >0: + polling_task.init_task(task_id)#初始化任务信息,并提交到待处理节点队列 + #保留task对象--巡检任务 + self.polling_tasks[task_id] = polling_task + + def th_control_takes(self): + while self.brun: + #遍历临时任务--启停任务 + pass + #遍历巡检任务-启停 + pass + #休眠 + time.sleep(60*5) - #开启task任务--正常只应该有web触发调用 + #开启task任务 def start_task_TM(self,task_id): task = self.tasks[task_id] if task: @@ -121,7 +168,7 @@ class TaskManager: return False,f"已到达最大的启动数--{self.max_run_task}" return False,"该任务不存在,程序逻辑存在问题!" - #停止task任务 + #停止task任务--暂停 def stop_task_TM(self,task_id): task = self.tasks[task_id] if task: diff --git a/mycode/TaskObject.py b/mycode/TaskObject.py index 0e84c8a..a58e686 100644 --- a/mycode/TaskObject.py +++ b/mycode/TaskObject.py @@ -27,7 +27,7 @@ import textwrap class TaskObject: - def __init__(self,test_target,cookie_info,work_type,llm_type,num_threads,local_ip,fake_target,taskM,safe_rank=0): + def __init__(self,test_target,cookie_info,work_type,llm_type,num_threads,local_ip,fake_target,taskM,target_id=0,safe_rank=0): #功能类相关 self.taskM = taskM self.logger = LogHandler().get_logger("TaskObject") @@ -42,6 +42,7 @@ class TaskObject: self.max_layer = myCongif.get_data("max_node_layer") self.sleep_time = myCongif.get_data("sleep_time") self.target = test_target + self.target_id = target_id #巡检任务有值--对应巡检目标 self.cookie = cookie_info self.work_type = work_type #工作模式 0-人工,1-自动 self.task_id = None @@ -542,15 +543,15 @@ class TaskObject: self.put_instr_mq(node) #2-执行中 def put_work_node(self,work_type): - '''遍历节点需要处理的任务,提交mq,load_task-在自动模式下-触发--线程安全 + '''遍历节点需要处理的任务,提交mq,load_task-触发--线程安全 work_type 0-人工,1-自动 ''' instr_status = None llm_status = None - if work_type == 0: + if work_type == 0: #人工 instr_status = (2,) llm_status = (4,) - else: + else: #自动 instr_status = (1,2) llm_status = (3,4) @@ -559,12 +560,12 @@ class TaskObject: if not node.bwork: continue node_work_status = node.get_work_status() - if node_work_status in instr_status: #待执行指令 - if node.is_instr_empty():#说明数据有问题了,放弃掉 + if node_work_status in instr_status: #有待执行指令 + if node.is_instr_empty():# node.update_work_status(-1) #置0 -1作为额外的条件参数 else: - self.put_instr_node(node) #1,2都提交执行 - node.update_work_status(-2)# 置2 + self.put_instr_node(node) #instr_status都提交执行 + node.update_work_status(-2)# 置2 --执行中 #llm-list不处理,正常应该为空 elif node_work_status in llm_status: if node.is_llm_empty():#数据有问题,放弃掉 @@ -983,9 +984,7 @@ class TaskObject: # 初始化节点树 if attack_tree: # 有值的情况是load self.attack_tree = attack_tree - # 加载未完成的任务 - # if self.work_type == 1: # 自动模式 - # # 提交到mq,待线程执行 + # 加载未完成的任务-并恢复工作 self.put_work_node(self.work_type) else: # 无值的情况是new_create