You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

313 lines
18 KiB

'''
实现对大模型调用的封装,隔离具体使用的LLM
pip install openai
export OPENAI_API_KEY="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"
'''
import openai
import json
import re
import os
from openai import OpenAI
from openai import OpenAIError, APIConnectionError, APITimeoutError
from myutils.ConfigManager import myCongif
from myutils.MyTime import get_local_timestr
from myutils.MyLogger_logger import LogHandler
from myutils.ContentManager import ContentManager
from mycode.CommandVerify import g_CV
class LLMManager:
def __init__(self,illm_type):
self.logger = LogHandler().get_logger("LLMManager")
self.api_key = None
self.api_url = None
self.ContM = ContentManager()
#temperature设置
if illm_type == 0: #腾讯云
self.api_key = "fGBYaQLHykBOQsFwVrQdIFTsYr8YDtDVDQWFU41mFsmvfNPc"
self.api_url = ""
elif illm_type == 1: #DS
self.api_key ="sk-10360148b465424288218f02c87b0e1b"
self.api_url ="https://api.deepseek.com/v1"
self.model = "deepseek-reasoner" #model=deepseek-reasoner -- R1 model=deepseek-chat --V3
elif illm_type == 2: #2233.ai
self.api_key = "sk-J3562ad9aece8fd2855bb495bfa1a852a4e8de8a2a1IOchD"
self.api_url = "https://api.gptsapi.net/v1"
self.model = "o3-mini-2025-01-31"
elif illm_type ==3: #GPT
# 定义代理服务器地址
proxy_url = "http://192.168.3.102:3128"
os.environ["HTTP_PROXY"] = proxy_url
os.environ["HTTPS_PROXY"] = proxy_url
self.api_key ="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"
self.api_url = "https://api.openai.com/v1"
self.model = "o3-mini-2025-01-31"
openai.proxy = proxy_url
openai.api_key = self.api_key
elif illm_type ==4:#通义Qwen3
self.api_key ="sk-48028b85e7604838b5be5bf6a90222cb"
self.api_url ="https://dashscope.aliyuncs.com/compatible-mode/v1"
self.model = "qwen3-235b-a22b"
else:
self.logger.error("模型参数选择异常!")
return
# 创建会话对象 -- 一个任务的LLM必须唯一
self.client = OpenAI(api_key=self.api_key, base_url=self.api_url)
'''
**决策原则**
- 根据节点类型和状态,优先执行基础测试(如端口扫描、服务扫描)。
- 仅在发现新信息或漏洞时新增子节点。
- 确保每个新增节点匹配测试指令。
'''
# 初始化messages
def build_initial_prompt(self,node,itype):
if not node:
return
#根节点初始化message----后续有可能需要为每个LLM生成不同的system msg
node.parent_messages = [{"role": "system",
"content":'''
你是一位资深的渗透测试专家,现在由你来指导针对一个目标的渗透测试工作,需要生成具体的指令交给本地程序执行,再根据本地程序提交的执行结果,规划下一步指令,直至全面完成渗透测试。
**总体要求**
1.以测试目标为根节点,以测试点作为子节点的形式来规划整个渗透测试方案;
2.测试点的规划需要基于执行结果:是测试目标涉及的,且是完整的,具体为:a.完成信息收集,根据信息收集到的内容,所有可能存在中高风险的测试点;b.漏洞验证成功,还能进一步利用的测试点;
3.新增测试点的约束:只有当当前节点提交了所有测试指令的执行结果,且没有新的测试指令需要验证时,再统一判断是否需要新增子节点进一步进行验证测试,若没有,则结束该路径的验证;
4.若一次性新增的子节点过多,无法为每个节点都匹配测试指令,请优先保障新增测试节点的全面;
5.生成的指令有两类:节点指令和测试指令,指令之间必须以空行间隔,不能包含注释和说明;
6.若无节点操作,节点指令可以不生成,若当前节点已完成测试,测试指令可以不生成;
7.只有当漏洞验证成功后,才能生成漏洞验证成功的指令,避免误报。
**节点指令格式**
- 新增节点:{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"节点1,节点2\"};
- 漏洞验证成功:{\"action\": \"find_vul\", \"node\": \"节点\",\"vulnerability\": {\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}};
- 节点完成测试:{\"action\": \"end_work\", \"node\": \"节点\"};
**测试指令格式**
- dash指令:```dash-[节点路径]指令内容```包裹,若涉及到多步指令,请生成python指令;
- python指令:```python-[节点路径]指令内容```包裹,主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(status, output);
- [节点路径]为从根节点到目标节点的完整层级路径;
**测试指令生成准则**
1.可以是dash指令,或python指令,必须按格式要求生成;
2.必须对应已有节点,或同时生成对应新增节点指令;
3.优先使用覆盖面广成功率高的指令;不能同时生成重复或作用覆盖的指令;
4.若需要多条指令配合测试,请生成对应的python指令,完成闭环返回;
5.避免用户交互,必须要能返回,返回的结果需要能利于你规划下一步指令。
**核心要求**
- 指令之间必须要有一个空行;
- 需确保测试指令的节点路径和指令的目标节点一致,例如:针对子节点的测试指令,节点路径不能指向当前节点;
**响应示例**
{\"action\":\"add_node\", \"parent\": \"192.168.1.100\", \"nodes\": \"3306端口,22端口\"}
```dash-[目标系统->192.168.1.100->3306端口]
mysql -u root -p 192.168.1.100
```
'''}] # 一个messages
#---2025-5-15分阶段方案---
def build_init_info_prompt(self,node):
if not node:
return
# 根节点初始化message----后续有可能需要为每个LLM生成不同的system msg
node.parent_messages = [{"role": "system","content": '''
你是一位资深的渗透测试专家,现在由你来指导针对一个目标的渗透测试工作,需要生成具体的指令交给本地程序执行,再根据本地程序提交的执行结果,规划下一步指令,直至全面完成渗透测试。当前为信息收集阶段。
**总体要求**
1.请针对测试目标生成信息收集的测试指令,不能包含注释和说明,严格遵守格式要求,不要有无关的前缀后缀等内容;
2.每条指令必须有独特且明确的目的,避免功能重复或范围重叠,可以是dash指令或python指令;
3.收集的信息包括:域名的相关信息,IP地址信息,对应端口的相关信息等,参考**JSON结果格式**中的内容,不需要额外采集其它信息;
4.若目标是URL,除获取域名相关信息外,还需要获取域名指向的IP,再进一步解析获取这些IP的Por信息;
5.若目标是IP,则无需尝试获取对应的URL信息,只需解析获取该IP的Port信息;
6.完成URL和IP端口信息的收集后,请按照格式要求封装结果信息,并检查无遗漏无重复后,再返回;
7.在信息收集阶段,节点路径都为当前节点,包括测试目标是域名,需要对测试目标指向的IP进行信息收集时,IP节点由本地程序在渗透测试阶段统一创建。
**响应示例**
- [{dash指令},{python指令}]
- [{JSON结果}]
**测试指令格式**
- dash指令:{\"action\":\"dash\",\"path\":\"节点路径\",\"content\":\"指令内容\"}
- python指令:{\"action\":\"python\",\"path\":\"节点路径\",\"content\":\"指令内容\"}
* python主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(status, output)
- [节点路径]为从根节点到目标节点的完整层级路径,以"->"关联,如:目标系统->192.168.1.100
**JSON结果格式**
- JSON结果:{\"action\":\"asset\",\"URL\":{URL信息json},\"IPS\":[\"IP\":\"192.168.1.100\",\"IPtype\":\"IPv4/IPv6\",\"Ports\":[端口信息json]]}
* 端口信息json:{\"Port\":\"端口号\",\"Service\":\"服务名称\",\"Version\":\"版本号\",\"Protocol\":\"TCP/UDP\",\"Status\":\"open/closed/filtered\"}
* URL信息json:{\"Domain\":\"域名\",\"Subdomains\":\"[子域名1,子域名2]\",\"Registrant\":\"注册人\",\"Email\":\"注册邮箱\",\"Registrar\":\"注册商\",\"Creation_date\":\"创建日期\",\"Expiration_date\":\"到期日期\"}
- 若URL无效,JSON中URL字段置空,若无子域名,Subdomains置空数组,若无端口信息,Ports字段返回空数组
**核心要求**
- 返回内容必须严格遵守响应示例,不允许有其他内容或说明
- 测试指令和JSON结果必须严格遵守对应格式要求
'''}] # 一个messages
def build_init_attact_prompt(self,node):
if not node:
return
# 根节点初始化message----后续有可能需要为每个LLM生成不同的system msg
node.parent_messages = [{"role": "system","content": '''
你是一位资深的渗透测试专家,现在由你来指导针对一个目标的渗透测试工作,需要生成具体的指令交给本地程序执行,再根据本地程序提交的执行结果,规划下一步指令,直至全面完成渗透测试。
**总体要求**
1.以测试目标为根节点,以测试点作为子节点的形式来规划整个渗透测试方案;
2.测试点的规划需要基于执行结果:是测试目标涉及的,且是完整的,具体为:a.完成信息收集,根据信息收集到的内容,所有可能存在中高风险的测试点;b.漏洞验证成功,还能进一步利用的测试点;
3.新增测试点的约束:只有当当前节点提交了所有测试指令的执行结果,且没有新的测试指令需要验证时,再统一判断是否需要新增子节点进一步进行验证测试,若没有,则结束该路径的验证;
4.若一次性新增的子节点过多,无法为每个节点都匹配测试指令,请优先保障新增测试节点的完整;
5.生成的指令有两类:节点指令和测试指令,不能包含注释和说明,严格遵守格式要求,不要有无关的前缀后缀等内容;
6.若无节点操作,节点指令可以不生成,若当前节点已完成测试,测试指令可以不生成;
7.只有当漏洞验证成功后,才能生成漏洞验证成功的指令,避免误报。
**响应示例**
- [{节点指令},{测试指令}]
**节点指令格式**
- 新增节点:{\"action\":\"add_node\", \"parent\": \"父节点\", \"nodes\": \"节点1,节点2\"};
- 漏洞验证成功:{\"action\": \"find_vul\", \"node\": \"节点\",\"vulnerability\": {\"name\":\"漏洞名称\",\"risk\":\"风险等级(低危/中危/高危)\",\"info\":\"补充信息(没有可为空)\"}};
- 节点完成测试:{\"action\": \"end_work\", \"node\": \"节点\"};
**测试指令格式**
- dash指令:{\"action\":\"dash\",\"path\":\"节点路径\",\"content\":\"指令内容\"}
- python指令:{\"action\":\"python\",\"path\":\"节点路径\",\"content\":\"指令内容\"}
* python主函数名为dynamic_fun,需包含错误处理,必须返回一个tuple(bool, output)
- [节点路径]为从根节点到目标节点的完整层级路径,以"->"关联,如:目标系统->192.168.1.100
**测试指令生成准则**
1.可以是dash指令,或python指令,必须按格式要求生成;
2.必须对应已有节点,或同时生成对应新增节点指令;
3.必须有独特且明确的目的,避免功能重复或范围重叠,优先使用覆盖面广成功率高的指令;
4.若需要多条指令配合测试,请生成对应的python指令,完成闭环返回;
5.避免用户交互,必须要能返回,返回的结果需要能利于你规划下一步指令。
**核心要求**
- 返回内容必须严格遵守响应示例,不允许有其他内容或说明;
- 节点指令和测试指令必须严格遵守对应的格式要求;
- 需确保测试指令的节点路径和指令的目标节点一致,例如:针对子节点的测试指令,节点路径不能指向当前节点;
'''}] # 一个messages
# 调用LLM生成指令
def get_llm_instruction(self,sendmessage):
'''
:sendmessage :发送的message
:return: iresult,reasoning_content,content,error
'''
#提交LLM
#准备请求参数
params = {
"model": self.model,
"messages": sendmessage,
}
# 某些模型额外的参数
stream = False
if self.model == "o3-mini-2025-01-31":
params["reasoning_effort"] = "high"
elif self.model == "qwen3-235b-a22b":
stream = True
params["stream"] = stream
params["extra_body"] = {"enable_thinking": True,"thinking_budget": 3000}
try:
# 调用 API
response = self.client.chat.completions.create(**params)
except APITimeoutError:
self.logger.error("LLM API 请求超时")
return -1,"","", f"调用超时(model={self.model})"
except APIConnectionError as e:
self.logger.error(f"网络连接错误: {e}")
return -1,"", "", "网络连接错误"
except OpenAIError as e:
# 包括 400/401/403/500 等各种 API 错误
self.logger.error(f"LLM API 错误: {e}")
return -1,"", "", f"API错误: {e}"
except Exception as e:
# 兜底,防止意外
self.logger.exception("调用 LLM 时出现未预期异常")
return -1,"", "", f"未知错误: {e}"
reasoning_content = ""
content = ""
if stream: #流式模式
is_answering = False
for chunk in response:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
if hasattr(delta, "reasoning_content") and delta.reasoning_content is not None:
reasoning_content += delta.reasoning_content
# 收到content,开始进行回复
if hasattr(delta, "content") and delta.content:
if not is_answering:
is_answering = True
content += delta.content
else:
#LLM返回结果处理
choice = response.choices[0].message
#LLM返回处理
if self.model == "deepseek-reasoner":
reasoning_content = getattr(choice, "reasoning_content", "")
content = choice.content
elif self.model == "o3-mini-2025-01-31" or self.model == "qwen-max-latest":
content = choice.content
else:
self.logger.error("处理到未预设的模型!")
content = choice.content
return 1,reasoning_content,content,""
def node_cmd_repair(self,part):
'''
对节点指令的合法性修复
:param part:
:return:
'''
#遇到漏洞赋值的节点指令缺少一个大括号,目前策略自动补全
# {"action":"find_vul", "node": "8180端口-Tomcat","vulnerability": {"name":"Tomcat弱口令漏洞","risk":"高危","info":"默认凭证tomcat:tomcat可访问管理控制台"}
whole = self.ContM.extract_json(part)
if not whole: #加一次就应该很少见了,不补充多次,也暂时只针对}
part += "}"
return part
def fetch_instruction(self,response_text):
'''
*****该函数很重要,需要一定的容错能力,解析LLM返回内容*****
处理边界:只格式化分析LLM返回内容,指令和节点操作等交其他模块。
节点控制指令
渗透测试指令
提取命令列表,包括:
1. Python 代码块 python[](.*?)
2. Shell 命令``dash[](.*?)```
:param text: 输入文本
:return: node_cmds,python_blocks,shell_blocks
'''
node_cmds = []
commands = []
# 序列化为JSON
try:
json_datas = json.loads(response_text)
except json.JSONDecodeError as e:
self.logger.debug(f"返回的内容转化成JSON失败,不符合json格式--{response_text}")
#反馈给LLM,重新返回内容
return False,node_cmds,commands,f"返回的内容json.loads失败,{str(e)}"
#对返回内容的json关键字验证
bok, strerror = g_CV.verify_node_cmds(json_datas)
if not bok:
return False, node_cmds, commands, strerror
#通过合法性校验--进行后续处理--拆分node_cmd 和 commands
for part in json_datas:
action = part["action"]
if action == "dash" or action == "python":
commands.append(part)
else:
node_cmds.append(part)
return True,node_cmds,commands,""
def test_llm(self):
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "讲个笑话吧。"}
]
response = self.client.chat.completions.create(
model=self.model,
reasoning_effort="medium",
messages=messages
)
print(response)
if __name__ == "__main__":
llm = LLMManager(3)
strcontent = '''
```dash-[目标系统->192.168.3.107]nmap -sV -Pn -p- 192.168.3.107```
```dash-[目标系统->192.168.3.107]dig +short -x 192.168.3.107```
'''
node_cmds, commands = llm.fetch_instruction(strcontent)
print(node_cmds)
print(commands)