You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

155 lines
6.4 KiB

import re
import subprocess
import tempfile
import os
import pexpect
import struct
import sys
import mysql.connector
import requests
from mycode.LLMManager import LLMManager
from mycode.TaskObject import TaskObject
from myutils.PickleManager import g_PKM
from mycode.InstructionManager import g_instrM
from mycode.TaskManager import g_TaskM
from mycode.PythonTManager import PythonTManager
from myutils.ConfigManager import myCongif
from mycode.DBManager import app_DBM
import textwrap
class Mytest:
def update_node_inter(self,attack_index):
attack_tree = g_PKM.ReadData(attack_index)
nodes = attack_tree.traverse_dfs()
# 06-0=>p
instr = nodes[6].get_instr_user().pop(0)
nodes[6].parent.get_instr_user().append(instr)
# 39-1
instr = nodes[39].get_instr_user().pop(1)
nodes[39].parent.get_instr_user().append(instr)
# 49-0
instr = nodes[49].get_instr_user().pop(0)
nodes[49].parent.get_instr_user().append(instr)
g_PKM.WriteData(attack_tree, attack_index)
def dynamic_fun(self):
try:
# 尝试无密码连接VNC
result = subprocess.run(
['vncviewer', '-passwd', '/dev/null', '192.168.204.137:5900', '-geometry', '1x1'],
timeout=15, capture_output=True, text=True
)
if 'Authentication failure' in result.stderr:
# 尝试常见弱口令组合
credentials = [
('admin', 'admin'),
('root', 'root'),
('vnc', 'vnc'),
('user', 'password')
]
for user, pwd in credentials:
cmd = f'vncauth {user} {pwd}'
auth_test = subprocess.run(cmd, shell=True, capture_output=True)
if auth_test.returncode == 0:
return (True, f'Valid credentials found: {user}/{pwd}')
return (False, 'No weak credentials found')
elif 'Connected' in result.stdout:
return (True, 'VNC access without authentication')
except subprocess.TimeoutExpired:
return (False, 'Connection timeout')
except Exception as e:
return (False, f'Error: {str(e)}')
def do_test(self):
import mysql.connector
cnx = mysql.connector.connect(
host="192.168.204.137",
user="root",
password="",
ssl_disabled=True
)
cur = cnx.cursor()
cur.execute("SHOW VARIABLES LIKE 'character_set_client'")
print(cur.fetchall()) # 应该显示 ('character_set_client', 'utf8')
cnx.close()
if __name__ == "__main__":
# 示例使用
mytest = Mytest()
LLM = LLMManager(1)
PythonM = PythonTManager(myCongif.get_data("Python_max_procs"))
current_path = os.path.dirname(os.path.realpath(__file__))
print(current_path)
test_type = 1
task_id = 16
task_Object = TaskObject("test_target","cookie_info",1,1,1,"local_ip",None)
if test_type == 0:
mytest.dynamic_fun()
elif test_type == 1:
# # 获取所有自定义函数详情 HIGH_RISK_FUNCTIONS = ['eval', 'exec', 'os.system', 'subprocess.call', 'subprocess.Popen']
str_instr = '''
hydra -L /mnt/zfsafe/tools/../payload/users -P /mnt/zfsafe/tools/../payload/passwords -t 6 -f -I -s 5900 -e ns 192.168.204.137 vnc -o hydra_result.txt
'''
#str_instr = str_instr.strip() + " --max-time 10"
dedented_code = textwrap.dedent(str_instr.strip())
#对多shell指令的情况进行处理--也有风险
if "python-code" not in dedented_code:
if "&&" in dedented_code:
dedented_code = task_Object.mill_instr_preprocess(dedented_code, "&&")
elif "||" in dedented_code:
dedented_code = task_Object.mill_instr_preprocess(dedented_code, "||")
instr, reslut, source_result, ext_params = g_instrM.execute_instruction(dedented_code)
else:
instr, reslut, source_result, ext_params = PythonM.execute_instruction(dedented_code)
# 只取结果的5000长度
reslut = task_Object.smart_truncate(reslut)
oneres = {'执行指令': instr, '结果': reslut}
print("----执行结果----")
print(reslut)
elif test_type == 2: #给节点添加指令
g_TaskM.load_tasks()
task = g_TaskM.tasks[task_id]
nodes = task.attack_tree.traverse_dfs()
cur_node = nodes[78]
commands = [
]
for cmd in commands:
cur_node.add_instr(cmd)
cur_node.update_work_status(1)
#保存数据
g_PKM.WriteData(task.attack_tree,str(task.task_id))
elif test_type ==3: #测试指令入节点
strinstr = '''
```python-[目标系统->192.168.204.137->端口扫描->80-HTTP服务检测->目录扫描->phpMyAdmin访问测试->默认凭证登录尝试->常用凭证爆破->字典暴力破解->版本漏洞检测] import requests def dynamic_fun(): try: # 检测phpMyAdmin版本信息 r = requests.get('http://192.168.204.137/phpMyAdmin/README', timeout=5) if 'phpMyAdmin' in r.text and 'Version' in r.text: return (True, f"版本信息泄露:{r.text.split('Version')[1].split('\\n')[0].strip()}") # 检测ChangeLog文件泄露 r = requests.get('http://192.168.204.137/phpMyAdmin/ChangeLog', timeout=5) if 'phpMyAdmin ChangeLog' in r.text: return (True, "存在ChangeLog文件泄露风险") return (True, "未获取到有效版本信息") except Exception as e: return (False, f"版本检测异常:{str(e)}") ```
'''
strNodes = "执行系统命令探测,权限提升尝试,横向移动测试"
nodes = strNodes.split(', ')
unique_names = list(set(nodes)) # 去重
for node_name in unique_names:
print(node_name)
elif test_type == 4: # 修改Messages
attact_tree = g_PKM.ReadData("6")
# 创建一个新的节点
from mycode.AttackMap import TreeNode
testnode = TreeNode("test", 0)
LLM.build_initial_prompt(testnode) # 新的Message
systems = testnode.messages[0]["content"]
# print(systems)
# 遍历node,查看有instr的ndoe
nodes = attact_tree.traverse_bfs()
for node in nodes:
node.messages[0]["content"] = systems
g_PKM.WriteData(attact_tree, "6")
print("完成Messgae更新")
elif test_type ==5:
mytest.do_test()
else:
pass