From 32c6f2f65aff80a68212af533c6f877e5ab24bca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E9=BE=99?= Date: Wed, 30 Apr 2025 20:28:12 +0800 Subject: [PATCH] =?UTF-8?q?v0.5.3=20a.=E5=A2=9E=E5=8A=A0=E4=BA=86=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E6=B7=BB=E5=8A=A0=E7=9B=AE=E6=A0=87=EF=BC=8C=E9=87=8D?= =?UTF-8?q?=E6=96=B0=E8=B0=83=E6=95=B4=E4=BA=86=E4=BB=BB=E5=8A=A1=E7=9A=84?= =?UTF-8?q?=E5=90=AF=E5=81=9C=EF=BC=9B=20b.=E5=A2=9E=E5=8A=A0=E4=BA=86?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=BF=87=E6=BB=A4=E5=8A=9F=E8=83=BD=EF=BC=8C?= =?UTF-8?q?=E8=BF=87=E6=BB=A4=E6=8F=90=E4=BA=A4=E5=88=B0llm=E7=9A=84?= =?UTF-8?q?=E7=9B=AE=E6=A0=87=E4=BF=A1=E6=81=AF=EF=BC=9B=20c.=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E4=BA=86=E5=AF=B9=E9=80=9A=E4=BB=A5qwen3=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E7=9A=84=E5=AF=B9=E6=8E=A5=EF=BC=9B=20d.https?= =?UTF-8?q?=E6=9C=89=E7=9F=AD=E6=95=B0=E6=8D=AE=E5=8C=85=E4=B8=8D=E5=8F=8A?= =?UTF-8?q?=E6=97=B6=E5=8F=91=E9=80=81=E5=88=B0=E5=89=8D=E7=AB=AF=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=8C=E6=9A=82=E6=97=B6=E8=B0=83=E5=9B=9E?= =?UTF-8?q?http=EF=BC=9B=20e.=E5=85=B6=E4=BB=96=E7=9A=84=E4=B8=80=E4=BA=9B?= =?UTF-8?q?bug=E5=92=8C=E5=B7=A5=E5=85=B7=E8=BF=AD=E4=BB=A3=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yaml | 1 + mycode/AttackMap.py | 5 + mycode/DBManager.py | 47 ++- mycode/DataFilterManager.py | 20 ++ mycode/InstructionManager.py | 2 +- mycode/LLMManager.py | 101 +++--- mycode/PythonTManager.py | 17 +- mycode/PythoncodeTool.py | 47 ++- mycode/TargetManager.py | 9 +- mycode/TaskManager.py | 180 +++++++---- mycode/TaskObject.py | 217 ++++++++----- mycode/WebSocketManager.py | 9 + pipfile | 3 + run.py | 6 + test.py | 153 ++++----- tools/CurlTool.py | 4 +- tools/EchoTool.py | 42 ++- tools/ToolBase.py | 2 +- web/API/task.py | 27 +- web/API/wsm.py | 10 +- web/__init__.py | 8 +- web/common/utils.py | 12 +- .../{task_modal.js => his_task_modal.js} | 147 +++++++-- .../static/resources/scripts/my_web_socket.js | 2 +- .../static/resources/scripts/node_tree.js | 301 +++++++++--------- .../static/resources/scripts/task_manager.js | 108 ++++++- web/main/templates/his_task.html | 91 ++++-- web/main/templates/index.html | 129 +++++--- web/main/templates/task_manager.html | 34 +- web/main/templates/task_manager_modal.html | 6 +- 30 files changed, 1112 insertions(+), 628 deletions(-) create mode 100644 mycode/DataFilterManager.py rename web/main/static/resources/scripts/{task_modal.js => his_task_modal.js} (82%) diff --git a/config.yaml b/config.yaml index 869517d..a0ca0a6 100644 --- a/config.yaml +++ b/config.yaml @@ -1,5 +1,6 @@ #工作模式 App_Work_type: 0 #0-开发模式,只允许单步模式 1-生产模式 包裹下的逻辑可以正常执行 +max_run_task: 3 #允许同时运行的任务数 #线程休眠的时间 sleep_time: 20 diff --git a/mycode/AttackMap.py b/mycode/AttackMap.py index 087ce45..c54a945 100644 --- a/mycode/AttackMap.py +++ b/mycode/AttackMap.py @@ -50,6 +50,7 @@ class AttackTree: "node_bwork":node.bwork, "node_vultype":node.vul_type, "node_vulgrade":node.vul_grade, + "node_vulinfo":node.vul_info, "node_workstatus":node.get_work_status(), "children":[self.node_to_dict(child) for child in node.children] if node.children else [] } @@ -270,6 +271,10 @@ class TreeNode: self.copy_messages(p_msg,c_msg) self._instr_queue.append(instr) + def test_add_instr(self,instr): + self._instr_queue.append(instr) + self._llm_quere = [] + def get_instr(self): return self._instr_queue.pop(0) if self._instr_queue else None diff --git a/mycode/DBManager.py b/mycode/DBManager.py index a33700d..3dae751 100644 --- a/mycode/DBManager.py +++ b/mycode/DBManager.py @@ -161,11 +161,11 @@ class DBManager: return data def get_run_tasks(self): - strsql = "select ID,task_target,task_status,work_type,cookie_info,llm_type from task where task_status <> 2;" + strsql = "select ID,task_target,task_status,work_type,cookie_info,llm_type,safe_rank,fake_target from task where task_status <> 2 order by ID;" datas = self.do_select(strsql) return datas - def start_task(self,test_target,cookie_info,work_type,llm_type) -> int: + def start_task(self,test_target,cookie_info,work_type,llm_type,fake_target) -> int: ''' 数据库添加检测任务 :param task_name: @@ -174,9 +174,9 @@ class DBManager: ''' task_id =0 start_time = get_local_timestr() - sql = "INSERT INTO task (task_name,task_target,start_time,task_status,safe_rank,work_type,cookie_info,llm_type) " \ - "VALUES (%s,%s,%s,%s,%s,%s,%s,%s)" - params = (test_target,test_target,start_time,1,0,work_type,cookie_info,llm_type) + sql = "INSERT INTO task (task_name,task_target,start_time,task_status,safe_rank,work_type,cookie_info,llm_type,fake_target) " \ + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)" + params = (test_target,test_target,start_time,1,0,work_type,cookie_info,llm_type,fake_target) bok,task_id = self.safe_do_sql(sql,params,1) return task_id @@ -186,6 +186,19 @@ class DBManager: bok,_ = self.safe_do_sql(strsql, params) return bok + #0<-->1 + def update_task_status(self,task_id,new_status): + strsql = "update task set task_status=%s where ID=%s;" + params = (new_status,task_id) + bok, _ = self.safe_do_sql(strsql, params) + return bok + + def update_task_work_type(self,task_id,new_work_type): + strsql = "update task set work_type=%s where ID=%s;" + params = (new_work_type, task_id) + bok, _ = self.safe_do_sql(strsql, params) + return bok + def del_task(self,task_id): params = (task_id) strsql = "delete from task where ID=%s;" @@ -277,13 +290,11 @@ class DBManager: strsql = ''' select ID,node_path,do_sn,instruction,result from task_result where task_id = %s ''' + params = [task_id] if nodename.strip(): - strsql += " and nodename like %s;" - params = (task_id,nodename) - else: - strsql += ";" - params = (task_id) - datas = self.safe_do_select(strsql,params) + strsql += " and node_path like %s" + params.append(f"%{nodename}%") # 在参数中添加通配符 + datas = self.safe_do_select(strsql,tuple(params)) return datas #插入漏洞数据 @@ -308,12 +319,12 @@ class DBManager: # 按需添加其他条件 if nodename and nodename.strip(): # 检查nodename是否非空(去除前后空格后) - conditions.append("node_path=%s") - params.append(nodename) + conditions.append("node_path like %s") + params.append(f"%{nodename}%") if vultype and vultype.strip(): # 检查vultype是否非空 - conditions.append("vul_type=%s") - params.append(vultype) + conditions.append("vul_type like %s") + params.append(f"%{vultype}%") if vullevel and vullevel.strip(): # 检查vullevel是否非空 conditions.append("vul_level=%s") @@ -387,6 +398,12 @@ class DBManager: bok,_ = self.safe_do_sql(strsql,params) return bok + def get_one_instr(self,instr_id): + strsql = "select instruction from task_result where ID = %s" + params = (instr_id) + data = self.safe_do_select(strsql,params,1) + return data[0] + def test(self): # 建立数据库连接 diff --git a/mycode/DataFilterManager.py b/mycode/DataFilterManager.py new file mode 100644 index 0000000..47f36f0 --- /dev/null +++ b/mycode/DataFilterManager.py @@ -0,0 +1,20 @@ +class DataFilterManager: + def __init__(self,target,fake_target): + self.real_target = target + self.fake_target = fake_target + + def filter_prompt(self,prompt): + fake_prompt = prompt.replace(self.real_target,self.fake_target) + return fake_prompt + + def filter_instruction(self,instruction): + real_instruction = instruction.replace(self.fake_target,self.real_target) + return real_instruction + + def filter_result(self,instr,result): + fake_instr = instr.replace(self.real_target,self.fake_target) + fake_result = result.replace(self.real_target,self.fake_target) + return fake_instr,fake_result + + + diff --git a/mycode/InstructionManager.py b/mycode/InstructionManager.py index f6a1ee2..2570f09 100644 --- a/mycode/InstructionManager.py +++ b/mycode/InstructionManager.py @@ -73,7 +73,7 @@ class InstructionManager: print(f"未知工具:{tool_name}") #return bres,instr,result,source_result,ext_params - return instr, result, source_result, ext_params #取消bres的返回,所有指令执行结果多需要返回到Llm,用于控制节点的状态 + return bres,instr, result, source_result, ext_params #取消bres的返回,所有指令执行结果多需要返回到Llm,用于控制节点的状态 #过来指令:合规、判重、待执行等 def _instruction_filter(self,instruction): diff --git a/mycode/LLMManager.py b/mycode/LLMManager.py index 12033f7..ea11d2e 100644 --- a/mycode/LLMManager.py +++ b/mycode/LLMManager.py @@ -20,7 +20,6 @@ class LLMManager: self.api_url = None #temperature设置 - #DS------代码生成/数学解题:0.0 -- 数据抽取/分析:1.0 -- 通用对话:1.3 -- 翻译:1.3 -- 创意类写作:1.5 if illm_type == 0: #腾讯云 self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc" self.api_url = "" @@ -28,13 +27,10 @@ class LLMManager: self.api_key ="sk-10360148b465424288218f02c87b0e1b" self.api_url ="https://api.deepseek.com/v1" self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3 - # 创建会话对象 -- 一个任务的LLM必须唯一 - self.client = OpenAI(api_key=self.api_key, base_url=self.api_url) elif illm_type == 2: #2233.ai self.api_key = "sk-J3562ad9aece8fd2855bb495bfa1a852a4e8de8a2a1IOchD" self.api_url = "https://api.gptsapi.net/v1" self.model = "o3-mini-2025-01-31" - self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) elif illm_type ==3: #GPT # 定义代理服务器地址 proxy_url = "http://192.168.3.102:3128" @@ -43,11 +39,17 @@ class LLMManager: self.api_key ="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" self.api_url = "https://api.openai.com/v1" self.model = "o3-mini-2025-01-31" - openai.proxy = proxy_url openai.api_key = self.api_key - #self.client = openai - self.client = OpenAI(api_key=self.api_key,base_url=self.api_url) + elif illm_type ==4:#通义Qwen3 + self.api_key ="sk-48028b85e7604838b5be5bf6a90222cb" + self.api_url ="https://dashscope.aliyuncs.com/compatible-mode/v1" + self.model = "qwen3-235b-a22b" + else: + self.logger.error("模型参数选择异常!") + return + # 创建会话对象 -- 一个任务的LLM必须唯一 + self.client = OpenAI(api_key=self.api_key, base_url=self.api_url) ''' **决策原则** @@ -64,23 +66,23 @@ class LLMManager: "content":''' 你是一位渗透测试专家,来指导本地程序进行渗透测试,由你负责动态控制整个渗透测试过程,根据当前测试状态和返回结果,决定下一步测试指令,推动测试前进,直至完成渗透测试。 **总体要求** -1.以测试目标为根节点,结合信息收集和测试反馈的结果,以新的测试点作为子节点,逐步规划和推进下一步测试,形成树型结构(测试树); -2.每次规划测试指令时,只关注当前节点的测试推进,若涉及新增节点,原则上只应在当前节点下新增,避免测试路径的混乱; -3.只有当收到当前节点的所有测试指令的结果,且没有新的测试指令需要执行时,再判断是否需要新增子节点进一步进行验证测试,若没有,则结束该路径的验证; -4.若一次性新增的节点过多,无法为每个节点都匹配测试指令,请优先保障新增测试节点的完整性,若有新增的节点未能匹配测试指令,必须返回未匹配指令的节点列表; -5.生成的指令有两类:节点指令和测试指令,指令之间必须以空行间隔,不能包含注释和说明; -6.本地程序会执行生成的指令,但不具备分析判断和保持会话能力,只会把执行结果返回提交; +1.以测试目标为根节点,结合信息收集和测试反馈的结果,以新的测试点作为子节点,逐步规划和推进下一步测试,形成树型结构(测试树),测试点需尽量全面; +2.只有当收到当前节点的所有测试指令的结果,且没有新的测试指令需要执行时,再判断是否需要新增子节点进一步进行验证测试,若没有,则结束该路径的验证; +3.若一次性新增的节点过多,无法为每个节点都匹配测试指令,请优先保障新增测试节点的完整性,若有新增的节点未能匹配测试指令,必须返回未匹配指令的节点列表; +4.生成的指令有两类:节点指令和测试指令,指令之间必须以空行间隔,不能包含注释和说明; +5.本地程序会执行生成的指令,但不具备分析判断和保持会话能力,只会把执行结果返回提交; +6.只有当漏洞验证成功后,才添加该节点的漏洞信息; 7.若无需要处理的节点数据,节点指令可以不生成; -8.只有当漏洞验证成功,才更新该节点的漏洞信息; 8.若节点已完成测试,测试指令可以不生成。 **测试指令生成准则** -1.测试指令必须对应已有节点,或同时生成新增节点指令; -2.优先使用覆盖面广成功率高的指令;不要生成重复的指令; -3.若需要多条指令配合测试,请生成对应的python指令,完成闭环返回; -4.需要避免用户交互,必须要能返回。 +1.可以是dash指令,也可以是python指令,必须按格式要求生成; +2.必须对应已有节点,或同时生成新增节点指令; +3.优先使用覆盖面广成功率高的指令;不要生成重复的指令; +4.若需要多条指令配合测试,请生成对应的python指令,完成闭环返回; +5.避免用户交互,必须要能返回。 **节点指令格式** - 新增节点:{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"节点1,节点2\"}; -- 未生成指令节点列表:{\"action\": \"no_instruction\", \"nodes\": \"节点1,节点2\"}; +- 未匹配指令的节点列表:{\"action\": \"no_instruction\", \"nodes\": \"节点1,节点2\"}; - 漏洞验证成功:{\"action\": \"find_vul\", \"node\": \"节点\",\"vulnerability\": {\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}}; - 节点完成测试:{\"action\": \"end_work\", \"node\": \"节点\"}; **测试指令格式** @@ -90,7 +92,6 @@ class LLMManager: **核心要求** - 指令之间必须要有一个空行; - 需确保测试指令的节点路径和指令的目标节点一致,例如:针对子节点的测试指令,节点路径不能指向当前节点; -- 根据反馈信息,测试目标有可能产生高危漏洞的,必须新增节点,并提供测试指令; **响应示例** {\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"nodes\": \"3306端口,22端口\"} @@ -100,7 +101,7 @@ mysql -u root -p 192.168.1.100 '''}] # 一个messages # 调用LLM生成指令 - def get_llm_instruction(self,prompt,node): + def get_llm_instruction(self,prompt,node,DataFilter): ''' 1.由于大模型API不记录用户请求的上下文,一个任务的LLM不能并发! :param prompt:用户本次输入的内容 @@ -120,51 +121,66 @@ mysql -u root -p 192.168.1.100 "messages": sendmessage, } # 某些模型额外的参数 + stream = False if self.model == "o3-mini-2025-01-31": params["reasoning_effort"] = "high" + elif self.model == "qwen3-235b-a22b": + stream = True + params["stream"] = stream + params["extra_body"] = {"enable_thinking": True,"thinking_budget": 3000} try: # 调用 API response = self.client.chat.completions.create(**params) except APITimeoutError: - self.logger.error("OpenAI API 请求超时") + self.logger.error("LLM API 请求超时") return False, "","","", f"调用超时(model={self.model})" except APIConnectionError as e: self.logger.error(f"网络连接错误: {e}") return False, "","", "", "网络连接错误" except OpenAIError as e: # 包括 400/401/403/500 等各种 API 错误 - self.logger.error(f"OpenAI API 错误: {e}") + self.logger.error(f"LLM API 错误: {e}") return False, "","", "", f"API错误: {e}" except Exception as e: # 兜底,防止意外 self.logger.exception("调用 LLM 时出现未预期异常") return False, "","", "", f"未知错误: {e}" - #LLM返回结果处理 - choice = response.choices[0].message reasoning_content = "" content = "" - #LLM返回处理 - if self.model == "deepseek-reasoner": - reasoning_content = getattr(choice, "reasoning_content", "") - content = choice.content - # 记录llm历史信息 - node.cur_messages.append({'role': 'assistant', 'content': content}) - elif self.model == "deepseek-chat": - content = choice - node.cur_messages.append(content) - elif self.model == "o3-mini-2025-01-31": - content = choice.content - # 记录llm历史信息 - node.cur_messages.append({'role': 'assistant', 'content': content}) - else: - self.logger.error("处理到未预设的模型!") - return False,"","","","处理到未预设的模型!" + if stream: #流式模式 + is_answering = False + for chunk in response: + if not chunk.choices: + continue + delta = chunk.choices[0].delta + if hasattr(delta, "reasoning_content") and delta.reasoning_content is not None: + reasoning_content += delta.reasoning_content + # 收到content,开始进行回复 + if hasattr(delta, "content") and delta.content: + if not is_answering: + is_answering = True + content += delta.content + else: + #LLM返回结果处理 + choice = response.choices[0].message + #LLM返回处理 + if self.model == "deepseek-reasoner": + reasoning_content = getattr(choice, "reasoning_content", "") + content = choice.content + elif self.model == "o3-mini-2025-01-31" or self.model == "qwen-max-latest": + content = choice.content + else: + self.logger.error("处理到未预设的模型!") + return False,"","","","处理到未预设的模型!" + # 记录llm历史信息 + node.cur_messages.append({'role': 'assistant', 'content': content}) print(content) + real_con = DataFilter.filter_instruction(content) #按格式规定对指令进行提取 - node_cmds,commands = self.fetch_instruction(content) + node_cmds,commands = self.fetch_instruction(real_con) return True,node_cmds,commands,reasoning_content, content def fetch_instruction(self,response_text): @@ -179,6 +195,7 @@ mysql -u root -p 192.168.1.100 :param text: 输入文本 :return: node_cmds,python_blocks,shell_blocks ''' + #针对llm的回复,提取节点操作数据和执行的指令---- # 正则匹配 Python 代码块 python_blocks = re.findall(r"```python-(.*?)```", response_text, flags=re.DOTALL) diff --git a/mycode/PythonTManager.py b/mycode/PythonTManager.py index 839aee8..42f476a 100644 --- a/mycode/PythonTManager.py +++ b/mycode/PythonTManager.py @@ -14,7 +14,17 @@ class PythonTManager: self.python_tool = PythoncodeTool(maxnum) #python工具实例 def __del__(self): - self.python_tool.shutdown() + self.shutdown_pool() + + def start_pool(self): + return self.python_tool.start_pool() + + def shutdown_pool(self): + self.python_tool.shutdown_pool() + + def is_pool_active(self): + return self.python_tool.pool_active + def execute_instruction(self,instruction): bwork = False @@ -23,14 +33,15 @@ class PythonTManager: if self.cur_num < self.maxnum: self.cur_num += 1 bwork = True + if bwork:#还有空的子进程 #提交给进程池执行 - _,instruction,analysis,_,ext_params = self.python_tool.execute_instruction(instruction) + bsuccess,instruction,analysis,_,ext_params = self.python_tool.execute_instruction(instruction) #执行完成后,数量减一 with self.put_lock: self.cur_num -= 1 #返回结果 - return instruction,analysis,analysis,ext_params + return bsuccess,instruction,analysis,analysis,ext_params else: #如果没获取的许可,则等待N秒后再尝试---存在问题:多线程间没有先来先到的机制了,有可能第一个来排队的一直等到最后 time.sleep(20) #休眠20秒 diff --git a/mycode/PythoncodeTool.py b/mycode/PythoncodeTool.py index 316d056..1be4409 100644 --- a/mycode/PythoncodeTool.py +++ b/mycode/PythoncodeTool.py @@ -22,10 +22,13 @@ import base64 import itertools import random import tempfile -import multiprocessing import textwrap import smb import pexpect +import smbclient +import binascii +from mysql.connector import Error +from impacket.smbconnection import SMBConnection from itertools import product from socket import create_connection from cryptography import x509 @@ -60,7 +63,7 @@ def _execute_dynamic(instruction_str): 'set': set, 'str': str, 'sum': sum, 'type': type, 'open': open, 'Exception': Exception, 'locals': locals, 'ConnectionResetError':ConnectionResetError,'BrokenPipeError':BrokenPipeError, - 'bytes':bytes,'tuple':tuple, + 'bytes':bytes,'tuple':tuple,'format':format } # 构造安全的 globals safe_globals = { @@ -99,7 +102,11 @@ def _execute_dynamic(instruction_str): 'x509':x509, 'default_backend':default_backend, 'product':product, - 'create_connection':create_connection + 'create_connection':create_connection, + 'smbclient':smbclient, + 'binascii':binascii, + 'Error':Error, + 'SMBConnection':SMBConnection } safe_locals = {} try: @@ -126,7 +133,28 @@ def _execute_dynamic(instruction_str): class PythoncodeTool(): def __init__(self,max_num): - self.proc_pool = ProcessPoolExecutor(max_workers=max_num) + self.max_workers = max_num + self.proc_pool = None + self.pool_active = False + #self.proc_pool = ProcessPoolExecutor(max_workers=max_num) + + def start_pool(self): + if self.proc_pool is not None and self.pool_active: + print("进程池已经在运行中") + return False + print("启动进程池...") + self.proc_pool = ProcessPoolExecutor(max_workers=self.max_workers) + self.pool_active = True + return True + + def shutdown_pool(self): + if self.proc_pool is not None and self.pool_active: + print("关闭进程池...") + self.proc_pool.shutdown(wait=False) #wait=True 是阻塞执行,False立即返回 + self.pool_active = False + self.proc_pool = None + else: + print("进程池已经是关闭状态") def fix_code(self,code: str) -> str: """ @@ -257,14 +285,18 @@ class PythoncodeTool(): str:执行的指令 str:执行指令的结果 ''' + ext_params = ReturnParams() ext_params["is_user"] = False # 是否要提交用户确认 -- 默认False ext_params["is_vulnerability"] = False # 是否是脆弱点 + if not self.pool_active: + return False,instruction_old,"本地程序出行错误,请结束该节点的测试!","",ext_params + # 第一步:验证指令合法性 instruction,time_out,error = self.validate_instruction(instruction_old) if not instruction: - return False, instruction_old, error,"",ext_params + return True, instruction_old, error,"",ext_params # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? # 第二步:执行指令 @@ -283,10 +315,7 @@ class PythoncodeTool(): # 第三步:分析执行结果 analysis = self.analyze_result(output, instruction,"","") - # 指令和结果入数据库 - # ? - if not analysis: # analysis为“” 不提交LLM - return False, instruction, analysis,"",ext_params + return True, instruction, analysis,"",ext_params def analyze_result(self, result,instruction,stderr,stdout): diff --git a/mycode/TargetManager.py b/mycode/TargetManager.py index 5e97c4a..7362458 100644 --- a/mycode/TargetManager.py +++ b/mycode/TargetManager.py @@ -28,19 +28,22 @@ class TargetManager: ''' regex_match = re.fullmatch(pattern, input_str) type = None + fake_target = "" if regex_match: domain_or_ip = regex_match.group(2) # 仅对 IPv4 格式的字符串进行有效性验证 if re.fullmatch(r'\d{1,3}(\.\d{1,3}){3}', domain_or_ip): if not self._is_valid_ipv4(domain_or_ip): - return False, None,type + return False, None,type,fake_target else: type = 1 #IP + fake_target = "192.168.3.107" else: type = 2 #domain - return True, domain_or_ip,type + fake_target = "www.czzfxxkj.com" + return True, domain_or_ip,type,fake_target else: - return False, None,type + return False, None,type,fake_target g_TM = TargetManager() diff --git a/mycode/TaskManager.py b/mycode/TaskManager.py index d9f4636..0ec44d8 100644 --- a/mycode/TaskManager.py +++ b/mycode/TaskManager.py @@ -2,21 +2,26 @@ from myutils.ConfigManager import myCongif from mycode.TaskObject import TaskObject from mycode.DBManager import app_DBM from myutils.PickleManager import g_PKM - +from myutils.MyLogger_logger import LogHandler +from mycode.TargetManager import TargetManager # 从模块导入类 +import time import threading class TaskManager: def __init__(self): + self.logger = LogHandler().get_logger("TaskManager") + self.TargetM = TargetManager() self.tasks = {} # 执行中的任务,test_id为key self.num_threads = myCongif.get_data("Task_max_threads") + self.max_run_task = myCongif.get_data("max_run_task") + self.cur_task_run_num = 0 #获取系统信息 -- 用户可修改的都放在DB中,不修改的放config data = app_DBM.get_system_info() self.local_ip = data[0] self.version = data[1] - self.tasks_lock = threading.Lock() #加个线程锁?不使用1,quart要使用的异步锁,2.限制只允许一个用户登录,3.遍历到删除的问题不大 + self.tasks_lock = threading.Lock() self.web_cur_task = 0 #web端当前显示的 - #判断目标是不是在当前执行任务中,---没加锁,最多跟删除有冲突,问题应该不大 def is_target_in_tasks(self,task_target): for task in self.tasks.values(): @@ -26,7 +31,7 @@ class TaskManager: #程序启动后,加载未完成的测试任务 def load_tasks(self): - '''程序启动时,加载未执行完成的任务''' + '''程序启动时,加载未执行完成的,未点击结束的任务 -- task_status<>2''' datas = app_DBM.get_run_tasks() for data in datas: task_id = data[0] @@ -35,77 +40,136 @@ class TaskManager: work_type = data[3] cookie_info = data[4] llm_type = data[5] + safe_rank = data[6] + fake_target = data[7] # 创建任务对象 - task = TaskObject(task_target, cookie_info, work_type, llm_type, self.num_threads, self.local_ip,self) - #读取attact_tree + task = TaskObject(task_target, cookie_info, work_type, llm_type, self.num_threads, self.local_ip,fake_target,self,safe_rank) + #读取attact_tree---load的任务应该都要有attact_tree attack_tree = g_PKM.ReadData(str(task_id)) - #开始任务 ---会根据task_status来判断是否需要启动工作线程 - task.start_task(task_id,task_status,attack_tree) - # 保留task对象 - self.tasks[task_id] = task + if attack_tree: + #恢复数据 + task.init_task(task_id,attack_tree) + #开始任务 ---根据task_status来判断是否需要启动工作线程 + if task_status == 1: + if self.cur_task_run_num < self.max_run_task: #load 是程序刚起,只有主线程,不加锁 + bsuc,strout = task.start_task() + if bsuc: + self.cur_task_run_num +=1 + else: + task.update_task_status(0) + else: + self.logger.error("重载未结束任务,不应该超过最大运行数量的task_status为启动状态") + task.update_task_status(0)#尝试硬恢复 + # 内存保留task对象 + self.tasks[task_id] = task + else: + self.logger.error(f"{task_id}任务的节点树数据缺失,需要检查!") - #新建测试任务 - def create_task(self, test_target,cookie_info,llm_type,work_type): + #新建测试任务--2025-4-28调整为可以批量添加--cookie_info信息取消 + def create_task(self, test_targets,llm_type,work_type): """创建新任务--create和load复用?-- 1.创建和初始化task_object; 2.创建task_id - 3.启动工作线程 - return T/F + return 失败的list """ - if self.is_target_in_tasks(test_target): - raise ValueError(f"Task {test_target} already exists") - #创建任务对象 - task = TaskObject(test_target,cookie_info,work_type,llm_type,self.num_threads,self.local_ip,self) - #获取task_id -- test_target,cookie_info,work_type,llm_type 入数据库 - task_id = app_DBM.start_task(test_target,cookie_info,work_type,llm_type) - if task_id >0: - #创建后启动工作--同时赋值task_id - task.start_task(task_id) - #保留task对象 - self.tasks[task_id] = task - return True - else: - return False + fail_list = [] + target_list = test_targets.split(",") + for target in target_list: + #这里判断目标的合法性 + bok,target,type,fake_target = self.TargetM.validate_and_extract(target) #是否还需要判断待定? + if not bok:#目标不合法 + fail_list.append(target) + continue + #判断是否已在执行列表 + if self.is_target_in_tasks(target): + fail_list.append(target) + continue + #raise ValueError(f"Task {test_target} already exists") + #创建任务对象--cookie参数取消 + task = TaskObject(target,"",work_type,llm_type,self.num_threads,self.local_ip,fake_target,self) + #获取task_id -- test_target,cookie_info,work_type,llm_type 入数据库 + task_id = app_DBM.start_task(target,"",work_type,llm_type,fake_target) + if task_id >0: + #2025-4-28调整批量添加任务,默认不启动 + task.init_task(task_id) + #保留task对象 + self.tasks[task_id] = task + else: + fail_list.append(target) + result = ",".join(fail_list) + return result - def over_task(self,task_id): + #开启task任务--正常只应该有web触发调用 + def start_task_TM(self,task_id): task = self.tasks[task_id] if task: - task.brun = False - #修改数据库数据 - bsuccess = app_DBM.over_task(task_id) - if bsuccess: - del self.tasks[task_id] #删除缓存 - return bsuccess,"" + with self.tasks_lock: + if self.cur_task_run_num < self.max_run_task: + #启动工作线程和进程 + bsuc,error = task.start_task() + if bsuc: + self.cur_task_run_num += 1 + return True,"启动成功" + else: + return False,error + else: + return False,f"已到达最大的启动数--{self.max_run_task}" + return False,"该任务不存在,程序逻辑存在问题!" + + #停止task任务 + def stop_task_TM(self,task_id): + task = self.tasks[task_id] + if task: + if task.brun and task.task_status ==1: #是运行状态 + with self.tasks_lock: + task.stop_task() #停止线程应该就没什么失败需要处理的 + self.cur_task_run_num -= 1 + return True,"停止任务成功" + else: + return True,"该任务已处于停止状态" + return False,"该任务不存在,程序逻辑存在问题!" + + #结束任务 + def over_task(self,task_id): + #先尝试停止 + bsuccess,_ = self.stop_task_TM(task_id) + time.sleep(1) + if bsuccess: + #再结束 + bsuccess = app_DBM.over_task(task_id) # 不管是不是修改(置2)成功,都执行结束 + del self.tasks[task_id] # 删除缓存 + return True,"结束任务成功" else: - return False,"没有找到对应的任务" + return False,"该任务不存在,程序逻辑存在问题!" + #删除任务 def del_task(self,task_id): - if g_PKM.DelData(str(task_id)): - bsuccess = app_DBM.del_task(task_id) - return bsuccess,"" - else: - return False,"删除对应文件失败" + #删除数据记录 + app_DBM.del_task(task_id) + #删除节点树 + g_PKM.DelData(str(task_id)) + return True,"删除任务成功!" - #控制task启停----线程不停 + #控制task启停----线程不停 --2025-4-28 配合批量任务,需要停止工作线程了 def control_taks(self,task_id): task = self.tasks[task_id] if task: if task.task_status == 0: # 0-暂停,1-执行中,2-已完成 - task.task_status = 1 + bsuc,error = self.start_task_TM(task_id) #任务是否存在的状态有点重复 elif task.task_status == 1: - task.task_status = 0 + bsuc,error = self.stop_task_TM(task_id) else: return False,"当前任务状态不允许修改,请联系管理员!",task.task_status else: return False,"没有找到对应的任务",None - return True,"",task.task_status + return bsuc,error,task.task_status # 获取任务list def get_task_list(self): tasks = [] for task in self.tasks.values(): - one_json = {"taskID": task.task_id, "testTarget": task.target, "taskStatus": task.task_status, "safeRank": task.safe_rank, - "workType": task.work_type} + one_json = {"taskID": task.task_id, "testTarget": task.target, "taskStatus": task.task_status, + "safeRank": task.safe_rank,"workType": task.work_type} tasks.append(one_json) rejson = {"tasks": tasks} return rejson @@ -132,10 +196,11 @@ class TaskManager: task = self.tasks[task_id] if task: if task.task_status == 0: - task.work_type = new_work_type + task.update_task_work_type(new_work_type) return True return False + #------------节点操作相关-------还未二次走查------------- #控制节点的工作状态 def node_bwork_control(self,task_id,node_path): task = self.tasks[task_id] @@ -214,23 +279,4 @@ class TaskManager: tasks = app_DBM.get_his_tasks(target_name,safe_rank,llm_type,start_time,end_time) return tasks - - #------------以下函数还未验证处理----------- - - def start_task(self, task_id): - """启动指定任务""" - task = self.tasks.get(task_id) - if task: - task.start(self.num_threads) - else: - print(f"Task {task_id} not found") - - def stop_task(self, task_id): - """停止指定任务""" - task = self.tasks.get(task_id) - if task: - task.stop() - else: - print(f"Task {task_id} not found") - g_TaskM = TaskManager() #单一实例 \ No newline at end of file diff --git a/mycode/TaskObject.py b/mycode/TaskObject.py index 9d50261..50fc6f4 100644 --- a/mycode/TaskObject.py +++ b/mycode/TaskObject.py @@ -1,7 +1,6 @@ ''' 渗透测试任务管理类 一次任务的闭合性要检查2025-3-10 一次任务后要清理LLM和InstrM的数据 ''' -from mycode.TargetManager import TargetManager # 从模块导入类 #from LLMManager import LLMManager # 同理修正其他导入 from mycode.ControlCenter import ControlCenter #控制中心替代LLM--控制中心要实现一定的基础逻辑和渗透测试树的维护。 from mycode.InstructionManager import g_instrM @@ -15,6 +14,7 @@ from myutils.ConfigManager import myCongif from mycode.WebSocketManager import g_WSM from mycode.CommandVerify import g_CV from mycode.PythonTManager import PythonTManager +from mycode.DataFilterManager import DataFilterManager import asyncio import queue import time @@ -26,12 +26,13 @@ import textwrap class TaskObject: - def __init__(self,test_target,cookie_info,work_type,llm_type,num_threads,local_ip,taskM): + def __init__(self,test_target,cookie_info,work_type,llm_type,num_threads,local_ip,fake_target,taskM,safe_rank=0): #功能类相关 self.taskM = taskM self.logger = LogHandler().get_logger("TaskObject") self.InstrM = g_instrM # 类对象渗透,要约束只读取信息,且不允许有类全局对象--持续检查 self.PythonM = PythonTManager(myCongif.get_data("Python_max_procs")) + self.DataFilter = DataFilterManager(test_target,fake_target) self.CCM = ControlCenter() #一个任务一个CCM self.LLM = LLMManager(llm_type) # LLM对象调整为一个任务一个对象,这样可以为不同的任务选择不同的LLM #全局变量 @@ -42,10 +43,10 @@ class TaskObject: self.cookie = cookie_info self.work_type = work_type #工作模式 0-人工,1-自动 self.task_id = None - self.task_status = 0 #0-暂停,1-执行中,2-已完成 + self.task_status = 0 #0-暂停,1-执行中,2-已完成 3-未启动,2025-4-27为配合批量添加任务增加“未启动”状态 self.local_ip = local_ip self.attack_tree = None #任务节点树 - self.safe_rank = 0 #安全级别 0-9 #?暂时还没实现更新逻辑 + self.safe_rank = safe_rank #安全级别 0-9 #?暂时还没实现更新逻辑 self.is_had_work = False self.is_had_work_lock = threading.Lock() @@ -88,11 +89,36 @@ class TaskObject: """截取字符串前 max_length 个字符,并添加截断标记(确保总长度不超过限制)""" if not s: return s + #一些无效信息的删除 + s = s.replace("pydev debugger: bytes arguments were passed to a new process creation function. Breakpoints may not work correctly.","") # 计算保留长度(考虑截断标记占位) truncated = s[:max_length - len(ellipsis)] if len(s) > max_length else s return truncated + ellipsis if len(s) > max_length else s + def do_instruction(self,instruction): + instruction = textwrap.dedent(instruction.strip()) + # 对多shell指令的情况进行处理--也有风险 + if "python-code" not in instruction: + if "&&" in instruction: + instruction = self.mill_instr_preprocess(instruction, "&&") + elif "||" in instruction: + instruction = self.mill_instr_preprocess(instruction, "||") + start_time = get_local_timestr() # 指令执行开始时间 + + # 2025-4-27要重新定义bool值作用,初步想法True-作为指令已处理(执行或不执行),False-作为异常,指令还没有处理-可以回Q + + if instruction.startswith("python-code"): # python代码--超过子进程数会阻塞等待,但不开始超时记时 + bsuccess, instr, reslut, source_result, ext_params = self.PythonM.execute_instruction(instruction) + else: # shell指令 + bsuccess, instr, reslut, source_result, ext_params = self.InstrM.execute_instruction(instruction) + + end_time = get_local_timestr() # 指令执行结束时间 + # 只取结果的5000长度 + reslut = self.smart_truncate(reslut, 4000) + source_result = self.smart_truncate(source_result) + return start_time,end_time,bsuccess,instr,reslut,source_result,ext_params + def do_worker_th(self,index): #线程的dbm需要一个线程一个 th_DBM = DBManager() @@ -113,23 +139,7 @@ class TaskObject: break self.doing_instr_list[th_index] = instruction - instruction = textwrap.dedent(instruction.strip()) - #对多shell指令的情况进行处理--也有风险 - if "python-code" not in instruction: - if "&&" in instruction: - instruction = self.mill_instr_preprocess(instruction, "&&") - elif "||" in instruction: - instruction = self.mill_instr_preprocess(instruction, "||") - start_time = get_local_timestr() # 指令执行开始时间 - - if instruction.startswith("python-code"):#python代码--超过子进程数会阻塞等待,但不开始超时记时 - instr, reslut, source_result, ext_params = self.PythonM.execute_instruction(instruction) - else:#shell指令 - instr, reslut, source_result, ext_params = self.InstrM.execute_instruction(instruction) - end_time = get_local_timestr() # 指令执行结束时间 - #只取结果的5000长度 - reslut = self.smart_truncate(reslut,4000) - source_result = self.smart_truncate(source_result) + start_time, end_time, bsuccess, instr, reslut, source_result, ext_params = self.do_instruction(instruction) # 入数据库 -- bres True和False 都入数据库2025-3-10---加node_path(2025-3-18)#? if th_DBM.ok: @@ -139,9 +149,10 @@ class TaskObject: ext_params, work_node.path) else: self.logger.error("数据库连接失败!!") - #暂存结果 - oneres = {'执行指令': instr, '结果': reslut} - results.append(oneres) + # 暂存结果 + fake_inst,fake_resul = self.DataFilter.filter_result(instr,reslut) + oneres = {'执行指令': fake_inst, '结果': fake_resul} + results.append(oneres) #结果入队列后,指令就不能回退 #一条指令执行完成 self.doing_instr_list[th_index] = "" #指令都执行结束后,入节点待提交队列 @@ -152,6 +163,7 @@ class TaskObject: self.put_node_reslist(work_node, str_res, llm_type) # 保存记录 g_PKM.WriteData(self.attack_tree,str(self.task_id)) + except queue.Empty: if bnode_work: bnode_work = False @@ -187,7 +199,7 @@ class TaskObject: - 节点名称:{llm_node.name} - 节点状态:未完成 - 漏洞类型:{llm_node.vul_type} - ''' +''' while True: llm_data = llm_node.get_res() if llm_data is None: @@ -196,10 +208,12 @@ class TaskObject: str_res = llm_data["result"] #获取提示词 prompt = self.get_llm_prompt(llm_type,str_res,user_Prompt) + fake_prompt = self.DataFilter.filter_prompt(prompt) #目标脱敏 + self.doing_llm_list[th_index] = prompt # 提交llm请求返回数据--并对返回数据进行处理,节点指令直接执行,测试指令根据工作模式执行 post_time = get_local_timestr() - bsuccess,node_cmds, commands,reasoning_content, content = self.LLM.get_llm_instruction(prompt,llm_node) # message要更新 + bsuccess,node_cmds, commands,reasoning_content, content = self.LLM.get_llm_instruction(fake_prompt,llm_node,self.DataFilter) # message要更新 --llm_node只使用messages,都是脱敏后的数据 if not bsuccess: self.logger.error(f"模型接口调用出错:{content}") continue #丢弃 --若需要再次尝试,把llm_data再入队列 @@ -233,7 +247,7 @@ class TaskObject: strdata = "update accack_tree!" asyncio.run(g_WSM.send_data(idatatype, strdata)) # 先取消当前task,已经通知前端重新获取,这样可以避免后端不必要的数据推送 - self.taskM.web_cur_task = 0 + #self.taskM.web_cur_task = 0 except queue.Empty: if bnode_work: bnode_work = False @@ -279,14 +293,15 @@ class TaskObject: while self.brun: try: cur_time = get_local_timestr() - print(f"-----------当前时间程序运行情况:{cur_time}") + print(f"----------{self.task_id}-当前时间程序运行情况:{cur_time}") #执行中instr-node index = 0 for w_th in self.workth_list: if not w_th.is_alive():#线程 - print(f"线程-{index}已处于异常状态,需要重新创建一个工作线程") + print(f"Work线程-{index}已处于异常状态,需要重新创建一个工作线程") else: - print(f"线程-{index}在执行指令:{self.doing_instr_list[index]}") + if self.doing_instr_list[index]: + print(f"Work线程-{index}-在执行指令:{self.doing_instr_list[index]}") index += 1 index = 0 @@ -294,7 +309,8 @@ class TaskObject: if not l_th.is_alive(): print(f"LLM线程-{index}已处于异常状态,需要重新创建一个LLM线程") else: - print(f"LLM线程-{index}在执行指令:{self.doing_llm_list[index]}") + if self.doing_llm_list[index]: + print(f"LLM线程-{index}-在执行指令:{self.doing_llm_list[index]}") index += 1 #处理点修复操作 icount +=1 @@ -485,7 +501,7 @@ class TaskObject: #更新状态 bchange = node.update_work_status(work_status) #基于websocket推送到前端 - if bchange: + if bchange and work_status != 1: #llm执行完成后会发送单独的指令更新树,所以不发送1更新节点了 #判断是否是web端最新获取数据的task if self.taskM.web_cur_task == self.task_id: idatatype = 1 @@ -681,8 +697,9 @@ class TaskObject: 2.这些节点的父节点为当前节点,请正确生成这些节点的节点路径; 3.只有当还有节点未能生成测试指令或不完整时,才返回未生成指令的节点列表。 ''' - bsuccess,node_cmds, commands, reasoning_content, content, post_time = self.LLM.get_llm_instruction(user_Prompt, - cur_node) # message要更新 + fake_prompt = self.DataFilter.filter_prompt(user_Prompt) + bsuccess,node_cmds, commands, reasoning_content, content = self.LLM.get_llm_instruction(fake_prompt, + cur_node,self.DataFilter) # message要更新 if not bsuccess: self.logger.error(f"模型接口调用出错:{content}") ierror += 1 @@ -692,6 +709,7 @@ class TaskObject: res_str = "" # LLM记录存数据库 cur_node.llm_sn += 1 + post_time = get_local_timestr() bres = DBM.insert_llm(self.task_id, user_Prompt, reasoning_content, content, post_time, cur_node.llm_sn,cur_node.path) if not bres: self.logger.error(f"{cur_node.name}-llm入库失败!") @@ -712,56 +730,99 @@ class TaskObject: self.logger.debug("未添加指令的节点,都已完成指令的添加!") return new_commands - def start_task(self,task_id,task_status=1,attack_tree=None): + #-----------------任务的启停-------------------- + def init_task(self,task_id,attack_tree = None): self.task_id = task_id - ''' - 启动该测试任务 - ''' - #判断目标合法性 - # bok,target,type = self.TargetM.validate_and_extract(self.target) #是否还需要判断待定? - # if not bok: - # return False, "{target}检测目标不合规,请检查!" - #初始化节点树 - if attack_tree:#有值的情况是load - self.attack_tree =attack_tree - #加载未完成的任务 - if self.work_type ==1:#自动模式 - #提交到mq,待线程执行 + # 初始化节点树 + if attack_tree: # 有值的情况是load + self.attack_tree = attack_tree + # 加载未完成的任务 + if self.work_type == 1: # 自动模式 + # 提交到mq,待线程执行 self.put_work_node() - else: #无值的情况是new_create - root_node = TreeNode(self.target, self.task_id) #根节点 - self.attack_tree = AttackTree(root_node) #创建测试树,同时更新根节点相关内容 - self.LLM.build_initial_prompt(root_node) #对根节点初始化system-msg - #插入一个user消息 + else: # 无值的情况是new_create + root_node = TreeNode(self.target, self.task_id) # 根节点 + self.attack_tree = AttackTree(root_node) # 创建测试树,同时更新根节点相关内容 + self.LLM.build_initial_prompt(root_node) # 对根节点初始化system-msg + # 插入一个user消息 # 提交第一个llm任务,开始工作 know_info = f"本测试主机的IP地址为:{self.local_ip}" - if self.cookie: - know_info = know_info + f",本站点的cookie值为{self.cookie}" - self.put_node_reslist(root_node,know_info,0) #入待提交list - #初始保存个attack_tree文件 - g_PKM.WriteData(self.attack_tree,str(self.task_id)) - #启动工作线程 - self.task_status = task_status - self.brun = True #线程正常启动 - #启动指令工作线程 - for i in range(self.max_thread_num): - w_th = threading.Thread(target=self.do_worker_th,args=(i,)) - w_th.start() - self.workth_list[i] = w_th - #启动llm提交线程--llm暂时单线程,多线程处理时attack_tree需要加锁 - for j in range(self.llm_max_nums): - l_th = threading.Thread(target=self.th_llm_worker,args=(j,)) - l_th.start() - self.llmth_list[j]=l_th - #启动自检线程 - self.check_th = threading.Thread(target=self.th_check) - self.check_th.start() + self.put_node_reslist(root_node, know_info, 0) # 入待提交list + # 初始保存个attack_tree文件 + g_PKM.WriteData(self.attack_tree, str(self.task_id)) + + def is_task_stop(self): + #检查任务是否处于停止状态--防止停止后,线程还没停,又启动工作线程,造成混乱 + #工作线程 + for work_th in self.workth_list: + if work_th: + if work_th.is_alive(): + self.logger.debug(f"{self.task_id}有存活工作线程") + return False + #llm线程 + for llm_th in self.llmth_list: + if llm_th: + if llm_th.is_alive(): + self.logger.debug(f"{self.task_id}有存活LLM线程") + return False + #自检线程 + if self.check_th: + if self.check_th.is_alive(): + self.logger.debug(f"{self.task_id}有存活自检线程") + return False + #工作子进程池 + if self.PythonM.is_pool_active(): + self.logger.debug(f"{self.task_id}有存活子进程池") + return False + return True + + def update_task_work_type(self,new_work_type): + self.work_type = new_work_type + #更新数据库 + app_DBM.update_task_work_type(self.task_id,new_work_type) + + def update_task_status(self,new_status): + self.task_status = new_status + #更新数据库 + app_DBM.update_task_status(self.task_id,new_status) + + def start_task(self): + ''' + 启动该测试任务, + :return bool str + ''' + if self.is_task_stop(): + #更新状态标识 + self.update_task_status(1) + self.brun = True #线程正常启动 + #启动指令工作线程 + for i in range(self.max_thread_num): + w_th = threading.Thread(target=self.do_worker_th,args=(i,),name=f"{self.task_id}-w_th-{i}") + w_th.start() + self.workth_list[i] = w_th + #启动llm提交线程--llm暂时单线程,多线程处理时attack_tree需要加锁 + for j in range(self.llm_max_nums): + l_th = threading.Thread(target=self.th_llm_worker,args=(j,),name=f"{self.task_id}-l_th-{i}") + l_th.start() + self.llmth_list[j]=l_th + #启动自检线程 + self.check_th = threading.Thread(target=self.th_check) + self.check_th.start() + #启动python子进程池 --进程池的启动,目前是没全面确认原子进程池的状态,直接None + self.PythonM.start_pool() + return True,"启动任务成功!" #就算启动了一部分线程,也要认为是成功 + else: + return False,"该任务的工作线程未全面停止,不能重新启动工作,请稍后,若半个小时候还不能启动,请联系技术支持!" def stop_task(self): #还未处理 - self.brun = False - self.InstrM.init_data() - #结束任务需要收尾处理#? - + if self.brun and self.task_status ==1: #不是正常工作状态就不执行停止工作了 + #停止子进程池 + self.PythonM.shutdown_pool() + #停止线程 + self.brun = False + self.update_task_status(0) + # 结束任务需要收尾处理#? + self.InstrM.init_data() #pass if __name__ == "__main__": pass \ No newline at end of file diff --git a/mycode/WebSocketManager.py b/mycode/WebSocketManager.py index 75d8cab..885e633 100644 --- a/mycode/WebSocketManager.py +++ b/mycode/WebSocketManager.py @@ -1,10 +1,13 @@ import json import struct +import asyncio from quart import websocket class WebSocketManager: def __init__(self): self.ws_clients={} + self.ineed_send = 0 + self.idone_send = 0 async def register(self, user_id, ws_proxy): ws = ws_proxy._get_current_object() # 获取代理背后的真实对象 @@ -41,9 +44,15 @@ class WebSocketManager: header = b"TFTF" + struct.pack("!II", idatatype, idata_len) message = header + body try: + # self.ineed_send +=1 + # print(f"第{self.ineed_send}次开始发送数据-{idatatype}") await ws.send(message) + await asyncio.sleep(0) + # await ws.ping() except Exception as e: print(f"发送失败: {e}") await self.unregister(user_id) # 异常时自动注销连接 + # self.idone_send += 1 + # print(f"WS-成功发送{self.idone_send}次数据-{idatatype}") g_WSM = WebSocketManager() \ No newline at end of file diff --git a/pipfile b/pipfile index 98beab2..7934b45 100644 --- a/pipfile +++ b/pipfile @@ -15,6 +15,7 @@ pip install psycopg2 -i https://pypi.tuna.tsinghua.edu.cn/simple/ pip install dirsearch -i https://pypi.tuna.tsinghua.edu.cn/simple/ pip install pexpect -i https://pypi.tuna.tsinghua.edu.cn/simple/ +pip install smbprotocol -i https://pypi.tuna.tsinghua.edu.cn/simple/ apt install sublist3r apt install gobuster @@ -22,6 +23,8 @@ apt install jq apt install libpq-dev python3-dev apt install sshpass +sudo apt install ncat + #smuggler git clone https://github.com/defparam/smuggler.git diff --git a/run.py b/run.py index 23a613a..e6b6a2f 100644 --- a/run.py +++ b/run.py @@ -11,6 +11,12 @@ async def run_quart_app(): config.bind = ["0.0.0.0:5001"] config.use_reloader = True # 启用热重载 config.reload_include_patterns = ["*.py", "templates/*", "static/*"] # 监控模板和静态文件 + + # Enable HTTPS + # config.certfile = "cert.pem" # Path to your certificate file + # config.keyfile = "key.pem" # Path to your private key file + # config.alpn_protocols = ["http/1.1"] + await serve(app, config) diff --git a/test.py b/test.py index 94e329f..f919f67 100644 --- a/test.py +++ b/test.py @@ -34,47 +34,27 @@ class Mytest: g_PKM.WriteData(attack_tree, attack_index) + def dynamic_fun(self): + import socket try: - # 尝试无密码连接VNC - result = subprocess.run( - ['vncviewer', '-passwd', '/dev/null', '192.168.204.137:5900', '-geometry', '1x1'], - timeout=15, capture_output=True, text=True - ) - if 'Authentication failure' in result.stderr: - # 尝试常见弱口令组合 - credentials = [ - ('admin', 'admin'), - ('root', 'root'), - ('vnc', 'vnc'), - ('user', 'password') - ] - for user, pwd in credentials: - cmd = f'vncauth {user} {pwd}' - auth_test = subprocess.run(cmd, shell=True, capture_output=True) - if auth_test.returncode == 0: - return (True, f'Valid credentials found: {user}/{pwd}') - return (False, 'No weak credentials found') - elif 'Connected' in result.stdout: - return (True, 'VNC access without authentication') - except subprocess.TimeoutExpired: - return (False, 'Connection timeout') + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(20) # 增加超时时间 + s.connect(("192.168.3.105", 52989)) + + # 基于返回的 "99 -1 45973" 字符串构造特殊payload + special_cmd = b'99\\x01\\x00\\x00\\x00' # 模拟协议头 + s.sendall(special_cmd) + + response = s.recv(2048) + s.close() + + return (True, f"SpecialCmd Response: {response.hex()}") except Exception as e: - return (False, f'Error: {str(e)}') + return (False, str(e)) def do_test(self): - import mysql.connector - - cnx = mysql.connector.connect( - host="192.168.204.137", - user="root", - password="", - ssl_disabled=True - ) - cur = cnx.cursor() - cur.execute("SHOW VARIABLES LIKE 'character_set_client'") - print(cur.fetchall()) # 应该显示 ('character_set_client', 'utf8') - cnx.close() + pass def tmp_test(self): list_a = [0,1,2,3,4,5,6,7,8,9] @@ -90,91 +70,84 @@ if __name__ == "__main__": # 示例使用 mytest = Mytest() LLM = LLMManager(1) - PythonM = PythonTManager(myCongif.get_data("Python_max_procs")) current_path = os.path.dirname(os.path.realpath(__file__)) print(current_path) test_type = 1 - task_id = 16 - task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip",None) + task_id = 49 + task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip","",None) if test_type == 0: mytest.dynamic_fun() elif test_type == 1: # # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen'] - str_instr = '''python-code - -import ssl -from socket import create_connection - + instruction = '''python-code def dynamic_fun(): + import socket + try: - # 强制使用CBC模式弱加密套件 - context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - context.set_ciphers('AES128-SHA') - - # 构造异常填充测试数据 - sock = create_connection(('58.216.217.70', 443)) - ssock = context.wrap_socket(sock, server_hostname='58.216.217.70') - - # 发送包含异常填充的测试请求 - ssock.send(b"GET / HTTP/1.1\\r\\nHost: 58.216.217.70\\r\\n" - b"Cookie: test=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\\r\\n\\r\\n") - response = ssock.recv(2048) - - # 检测异常响应模式 - if b"HTTP/1.1 200 OK" in response: - return (True, "服务器接受异常填充数据") - return (False, "未检测到典型漏洞特征") - - except ssl.SSLError as e: - return (False, f"加密错误: {repr(e)}") - except Exception as e: - return (False, f"验证失败: {str(e)}") -''' - #str_instr = str_instr.strip() + " --max-time 10" - dedented_code = textwrap.dedent(str_instr.strip()) - #对多shell指令的情况进行处理--也有风险 - if "python-code" not in dedented_code: - if "&&" in dedented_code: - dedented_code = task_Object.mill_instr_preprocess(dedented_code, "&&") - elif "||" in dedented_code: - dedented_code = task_Object.mill_instr_preprocess(dedented_code, "||") - instr, reslut, source_result, ext_params = g_instrM.execute_instruction(dedented_code) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(20) # 设置超时时间为20秒 + s.connect(("192.168.3.105", 11200)) + + # 发送畸形RTSP请求探测边界条件 + payload = "DESCRIBE rtsp://192.168.3.105/../../../../etc/passwd RTSP/1.0\\\\r\\\\n" + payload += "CSeq: 6\\\\r\\\\n\\\\r\\\\n" + + s.send(payload.encode()) + response = s.recv(4096).decode() + + s.close() + + if "404" in response: + return (False, "存在输入过滤机制") + elif "root:" in response: + return (True, "成功读取敏感文件") else: - instr, reslut, source_result, ext_params = PythonM.execute_instruction(dedented_code) + return (False, f"未知响应:{response}") - # 只取结果的5000长度 - reslut = task_Object.smart_truncate(reslut) + except Exception as e: + return (False, f"连接异常:{str(e)}") +''' + task_Object.PythonM.start_pool() #开个子进程池就行 + start_time, end_time, bsuccess, instr, reslut, source_result, ext_params = task_Object.do_instruction(instruction) + # 暂存结果 oneres = {'执行指令': instr, '结果': reslut} print("----执行结果----") print(reslut) elif test_type == 2: #给节点添加指令 + node_path = "目标系统->192.168.3.105->52989端口" + instr_id = 3233 g_TaskM.load_tasks() task = g_TaskM.tasks[task_id] nodes = task.attack_tree.traverse_dfs() - cur_node = nodes[78] - commands = [ - ] - for cmd in commands: - cur_node.add_instr(cmd) + cur_node = None + for node in nodes: + if node.path == node_path: + cur_node = node + break + if cur_node: + str_instr = app_DBM.get_one_instr(instr_id) + if "import" in str_instr: + str_instr = "python-code " + str_instr + cur_node.test_add_instr(str_instr) cur_node.update_work_status(1) - #保存数据 - g_PKM.WriteData(task.attack_tree,str(task.task_id)) + #保存数据 + g_PKM.WriteData(task.attack_tree,str(task.task_id)) + else: + print("没找到节点!") elif test_type ==3: #测试指令入节点 strinstr = ''' -) + ''' strNodes = "执行系统命令探测,权限提升尝试,横向移动测试" nodes = strNodes.split(', ') unique_names = list(set(nodes)) # 去重 for node_name in unique_names: print(node_name) - elif test_type == 4: # 修改Messages attact_tree = g_PKM.ReadData("27") # 创建一个新的节点 from mycode.AttackMap import TreeNode - testnode = TreeNode("test", 0) LLM.build_initial_prompt(testnode) # 新的Message systems = testnode.parent_messages[0]["content"] @@ -186,7 +159,7 @@ def dynamic_fun(): g_PKM.WriteData(attact_tree, "27") print("完成Messgae更新") elif test_type ==5: - mytest.do_test() + mytest.dynamic_fun() elif test_type == 6: mytest.tmp_test() else: diff --git a/tools/CurlTool.py b/tools/CurlTool.py index 5a2a03b..ea47632 100644 --- a/tools/CurlTool.py +++ b/tools/CurlTool.py @@ -14,7 +14,7 @@ class CurlTool(ToolBase): # self.verify_ssl = True def get_time_out(self): - return 60 + return 61*2 def validate_instruction(self, instruction_old): #instruction = instruction_old @@ -97,7 +97,7 @@ class CurlTool(ToolBase): return error except Exception as e: - return (False, f"命令执行失败: {str(e)}") + return (f"命令执行失败: {str(e)}") def execute_instruction(self, instruction_old): ext_params = self.create_extparams() diff --git a/tools/EchoTool.py b/tools/EchoTool.py index 82570fb..569bd76 100644 --- a/tools/EchoTool.py +++ b/tools/EchoTool.py @@ -1,13 +1,49 @@ from tools.ToolBase import ToolBase +import pexpect class EchoTool(ToolBase): def validate_instruction(self, instruction): #指令过滤 - timeout = 0 - if " nc " in instruction: - timeout = 60 + timeout = 60*5 return instruction,timeout + def do_worker_pexpect(self, str_instruction, timeout, ext_params): + try: + result = "" + exc_do = pexpect.spawn('bash', ['-c', str_instruction], timeout=timeout, + encoding='utf-8') # spawn 第一个参数是可执行文件 + index = exc_do.expect([ + pexpect.TIMEOUT, + pexpect.EOF + ]) + result += str(exc_do.before) + if index == 0: + result += f"\n执行超时{timeout}秒" + elif index == 1: + pass + else: + print("遇到其他输出!") + pass + return result + except Exception as e: + return f"执行错误: {str(e)}" + + def execute_instruction(self, instruction_old): + ext_params = self.create_extparams() + # 第一步:验证指令合法性 + instruction,time_out = self.validate_instruction(instruction_old) + if not instruction: + return False, instruction_old, "该指令暂不执行!","",ext_params + # 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#? + + # 第二步:执行指令---需要对ftp指令进行区分判断 + output = self.do_worker_pexpect(instruction, time_out, ext_params) + + # 第三步:分析执行结果 + analysis = self.analyze_result(output,instruction,"","") + + return True, instruction, analysis,output,ext_params + def analyze_result(self, result,instruction,stderr,stdout): #指令结果分析 if "GET / HTTP/1.1" in result and "X-Original-URL: /proc/self/environ" in result: diff --git a/tools/ToolBase.py b/tools/ToolBase.py index 923565d..bd4c6c8 100644 --- a/tools/ToolBase.py +++ b/tools/ToolBase.py @@ -152,7 +152,7 @@ class ToolBase(abc.ABC): stderr = "" try: if timeout == 0: - result = subprocess.run(instruction, shell=True, capture_output=True, text=True) + result = subprocess.run(instruction, shell=True, capture_output=True, text=True,timeout=60*30) elif timeout >0: result = subprocess.run(instruction, shell=True, capture_output=True, text=True, timeout=timeout) else: diff --git a/web/API/task.py b/web/API/task.py index efbb73d..94342c9 100644 --- a/web/API/task.py +++ b/web/API/task.py @@ -19,27 +19,24 @@ def is_valid_target(test_target: str) -> bool: @login_required async def start_task(): #开始任务 data = await request.get_json() - test_target = data.get("testTarget") - cookie_info = data.get("cookieInfo") + test_target = data.get("testTarget") #调整为多目标 llm_type = data.get("curmodel") # //0-腾讯云,1-DS,2-2233.ai,3-GPT 目前只有1-2,2025-4-4 - work_type = data.get("workType") #0-人工,1-自动 + work_type = 0 #data.get("workType") #0-人工,1-自动 if llm_type == 2: return jsonify({"error": "O3余额不足,请更换模型!"}), 400 - #新增任务处理 - bok,_,_ = g_TM.validate_and_extract(test_target) - if not bok: - # 返回错误信息,状态码 400 表示请求错误 - return jsonify({"error": "测试目标验证失败,请检查输入内容!"}), 400 + # #新增任务处理 + # bok,_,_ = g_TM.validate_and_extract(test_target) + # if not bok: + # # 返回错误信息,状态码 400 表示请求错误 + # return jsonify({"error": "测试目标验证失败,请检查输入内容!"}), 400 #开始任务 try: - b_success = g_TaskM.create_task(test_target,cookie_info,llm_type,work_type) - #再启动 - if not b_success: - return jsonify({"error": "检测任务创建失败,请联系管理员!"}), 500 + fail_list = g_TaskM.create_task(test_target,llm_type,work_type) + return jsonify({"fail_list":fail_list}) except: - return jsonify({"error": "该目标已经在测试中,请检查!"}), 400 + return jsonify({"error": "创建任务异常,前反馈给技术人员!"}), 400 #跳转到任务管理页面 - return redirect(url_for('main.get_html', html='task_manager.html')) + # return redirect(url_for('main.get_html', html='task_manager.html')) @api.route('/task/taskover',methods=['POST']) @login_required @@ -61,7 +58,6 @@ async def del_task(): bsuccess,error = g_TaskM.del_task(task_id) return jsonify({"bsuccess": bsuccess, "error": error}) - @api.route('/task/getlist',methods=['GET']) @login_required async def get_task_list(): @@ -160,6 +156,7 @@ async def node_one_step(): @api.route('/task/taskworktype',methods=['POST']) @login_required async def task_work_type_control(): + return jsonify({'error': '开发阶段不允许修改测试模式'}), 400 data = await request.get_json() task_id = data.get("cur_task_id") newwork_type = data.get("mode") diff --git a/web/API/wsm.py b/web/API/wsm.py index 73551c1..036343c 100644 --- a/web/API/wsm.py +++ b/web/API/wsm.py @@ -1,19 +1,13 @@ import json +import socket from . import api from quart import Quart, websocket, jsonify from mycode.WebSocketManager import g_WSM -from web.common.utils import login_required +from web.common.utils import login_required # WebSocket 路由,端口默认与 HTTP 同端口,例如 5000(开发时) @api.websocket("/ws") -@login_required async def ws(): - """ - WebSocket 连接入口: - 1. 客户端连接成功后,首先应发送登录数据包,例如 {"user_id": 1} - 2. 后端解析登录数据包,将 user_id 与 websocket 绑定(注册) - 3. 后续进入消息接收循环,根据数据协议(TFTF+头+体格式)处理数据 - """ # 接收登录数据包(假设为纯 JSON 包,非二进制格式) login_msg = await websocket.receive() try: diff --git a/web/__init__.py b/web/__init__.py index a3fa12d..6dc727a 100644 --- a/web/__init__.py +++ b/web/__init__.py @@ -5,13 +5,9 @@ from quart_cors import cors from pymemcache.client import base from .main import main from .API import api +from quart import redirect, request from functools import wraps from myutils.ConfigManager import myCongif -# from quart_sqlalchemy import SQLAlchemy -# from flask_migrate import Migrate - -#app.config['SECRET_KEY'] = 'mysecret' #密钥 --需要放配置文件 -#socketio = SocketIO(app) # Create the custom backend for quart-session class MemcachedSessionInterface: #只是能用,不明所以 @@ -35,8 +31,6 @@ def create_app(): app = Quart(__name__) app.config['SECRET_KEY'] = 'zfxxkj_2024_!@#' app.config["TEMPLATES_AUTO_RELOAD"] = True #动态加载模板文件 - #app.config['SESSION_FILE_DIR'] = './sessions' # session保存路径 - #app.config['SESSION_MEMCACHED'] = base.Client(('localhost', 11211)) app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。 app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密 app.config['SESSION_TYPE'] = 'redis' # session类型 diff --git a/web/common/utils.py b/web/common/utils.py index 96103a5..b18a4ac 100644 --- a/web/common/utils.py +++ b/web/common/utils.py @@ -3,7 +3,7 @@ import random import string import io from functools import wraps -from quart import session, redirect, url_for, flash,current_app +from quart import session, redirect, url_for, flash,current_app,request def generate_captcha(): characters = string.ascii_uppercase + string.digits @@ -33,6 +33,15 @@ def generate_captcha(): def verify_captcha(user_input, actual_captcha): return user_input == actual_captcha +def require_https(f): + @wraps(f) + async def decorated_function(*args, **kwargs): + if request.scheme == "http": + https_url = request.url.replace("http://", "https://", 1) + return redirect(https_url, code=301) + return await f(*args, **kwargs) + return decorated_function + def login_required(f): @wraps(f) @@ -42,6 +51,7 @@ def login_required(f): if not username or not token: await flash('未登录,请重新登录', 'error') return redirect(url_for('main.login')) + #从redis取最新的token redis_key = f"user_token:{username}" server_token = await current_app.redis.get(redis_key) diff --git a/web/main/static/resources/scripts/task_modal.js b/web/main/static/resources/scripts/his_task_modal.js similarity index 82% rename from web/main/static/resources/scripts/task_modal.js rename to web/main/static/resources/scripts/his_task_modal.js index 105b517..28e01b3 100644 --- a/web/main/static/resources/scripts/task_modal.js +++ b/web/main/static/resources/scripts/his_task_modal.js @@ -11,10 +11,11 @@ * "node_bwork":node.bwork, * "node_vultype":node.vul_type, * "node_vulgrade":node.vul_grade, + * "node_vulinfo":node.vul_info * children: [ { ... }, { ... } ] * } */ - function generateTreeHTML(nodeData) { + function his_generateTreeHTML(nodeData) { const li = document.createElement("li"); const nodeSpan = document.createElement("span"); nodeSpan.className = "tree-node"; @@ -25,6 +26,7 @@ nodeSpan.setAttribute("data-node_bwork", nodeData.node_bwork); nodeSpan.setAttribute("data-node_vultype", nodeData.node_vultype); nodeSpan.setAttribute("data-node_vulgrade", nodeData.node_vulgrade || ""); + nodeSpan.setAttribute("data-node_vulinfo", nodeData.node_vulinfo); nodeSpan.setAttribute("data-node_workstatus",nodeData.node_workstatus); if(nodeData.node_workstatus ===0){ nodeSpan.classList.add("no-work"); @@ -63,7 +65,7 @@ if (nodeData.children && nodeData.children.length > 0) { const ul = document.createElement("ul"); nodeData.children.forEach((child) => { - ul.appendChild(generateTreeHTML(child)); + ul.appendChild(his_generateTreeHTML(child)); }); li.appendChild(ul); } @@ -89,6 +91,7 @@ const nodebwork = el.getAttribute("data-node_bwork"); const vulType = el.getAttribute("data-node_vultype"); const vulLevel = el.getAttribute("data-node_vulgrade"); + const vulInfo = el.getAttribute("data-node_vulinfo"); const workstatus = el.getAttribute("data-node_workstatus"); //selectedNodeData = { nodeName, status, vulType, vulLevel,nodepath,nodebwork }; // 示例中默认填充 @@ -99,10 +102,11 @@ node_bwork: nodebwork, vul_type: vulType, vul_grade: vulLevel || "-", + vul_info:vulInfo, workstatus: workstatus }; //刷新界面内容 - update_select_node_data_show(nodeName,status,vulType,vulLevel,workstatus,nodebwork) + update_select_node_data_show(nodeName,status,vulType,vulLevel,vulInfo,workstatus,nodebwork) }); // 双击事件:展开/收缩子节点区域 el.addEventListener("dblclick", (event) => { @@ -126,11 +130,11 @@ } // 动态加载节点树数据 - async function loadNodeTree(task_id) { + async function his_loadNodeTree(task_id) { // 清空选中状态 selectedNodeData = null; //刷新界面内容 - update_select_node_data_show("-","-","-","-","-",false) + update_select_node_data_show("-","-","-","-","-","-",false) try { const res = await fetch("/api/task/gethistree", { method: "POST", @@ -152,7 +156,7 @@ // 创建一个