|
|
|
#mysql
|
|
|
|
#pip install mysql-connector-python
|
|
|
|
import pexpect
|
|
|
|
import shlex
|
|
|
|
import re
|
|
|
|
import mysql.connector
|
|
|
|
from mysql.connector import Error
|
|
|
|
from tools.ToolBase import ToolBase
|
|
|
|
|
|
|
|
class MysqlTool(ToolBase):
|
|
|
|
|
|
|
|
def test_empty_password_mysql_connection(self,host, username='root'):
|
|
|
|
"""
|
|
|
|
测试使用空密码连接到指定 MySQL 服务器。
|
|
|
|
|
|
|
|
参数:
|
|
|
|
host (str): MySQL 服务器的主机地址,例如 'haitutech.cn'
|
|
|
|
username (str): MySQL 用户名,默认值为 'root'
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
# 尝试使用空密码连接 MySQL
|
|
|
|
connection = mysql.connector.connect(
|
|
|
|
host=host, # 主机地址
|
|
|
|
user=username, # 用户名
|
|
|
|
password='', # 空密码
|
|
|
|
connection_timeout=10 # 设置10秒连接超时
|
|
|
|
)
|
|
|
|
if connection.is_connected():
|
|
|
|
res = f"成功连接到 {host},用户 {username} 使用空密码"
|
|
|
|
connection.close() # 关闭连接以释放资源
|
|
|
|
except Error as e:
|
|
|
|
# 捕获并打印连接错误
|
|
|
|
res = f"连接失败: {host} - {e}"
|
|
|
|
return res
|
|
|
|
|
|
|
|
def extract_mysql_password(self,command_line: str) -> str:
|
|
|
|
"""
|
|
|
|
从 MySQL 命令行中提取 -p 参数后的密码值
|
|
|
|
|
|
|
|
支持场景:
|
|
|
|
-p'' → 返回空字符串
|
|
|
|
-p'password' → 返回 'password'
|
|
|
|
-p"password" → 返回 'password'
|
|
|
|
-ppassword → 返回 'password'
|
|
|
|
-p password → 返回 'password'
|
|
|
|
"""
|
|
|
|
# 正则匹配模式
|
|
|
|
pattern = r'''
|
|
|
|
-p # 匹配 -p 参数
|
|
|
|
(?: # 非捕获分组
|
|
|
|
\s* # 允许空格(如 -p password)
|
|
|
|
(["']?) # 捕获引号类型(单引号/双引号/无引号)
|
|
|
|
(.*?) # 捕获密码内容
|
|
|
|
\1 # 闭合引号(与开头引号一致)
|
|
|
|
| # 或
|
|
|
|
(\S+) # 直接捕获无空格的密码(如 -ppassword)
|
|
|
|
)
|
|
|
|
'''
|
|
|
|
matches = re.search(pattern, command_line, re.VERBOSE)
|
|
|
|
|
|
|
|
if not matches:
|
|
|
|
return ""
|
|
|
|
|
|
|
|
# 提取密码(处理三种情况:引号包裹、无引号、直接拼接)
|
|
|
|
password = matches.group(2) or matches.group(3) or ""
|
|
|
|
return password
|
|
|
|
|
|
|
|
def validate_instruction(self, instruction):
|
|
|
|
timeout = 60
|
|
|
|
#modified_code = "mysql空密码登录测试"
|
|
|
|
instr = instruction.replace("--ssl-mode=DISABLED","--ssl=0") #mariaDB 没有ssl-mode参数
|
|
|
|
# if "--ssl=0" not in instr:
|
|
|
|
# instr = instr + " --ssl=0"
|
|
|
|
return instr,timeout
|
|
|
|
|
|
|
|
def do_worker_pexpect(self,str_instruction,timeout,ext_params):
|
|
|
|
try:
|
|
|
|
#safe_command = shlex.quote(str_instruction)
|
|
|
|
#cmd = f"bash -c {safe_command}"
|
|
|
|
strpwsd = self.extract_mysql_password(str_instruction)
|
|
|
|
|
|
|
|
result = ""
|
|
|
|
exc_do = pexpect.spawn('bash',['-c',str_instruction],timeout=timeout,encoding='utf-8')#spawn 第一个参数是可执行文件
|
|
|
|
index = exc_do.expect([
|
|
|
|
pexpect.TIMEOUT,
|
|
|
|
pexpect.EOF,
|
|
|
|
'Enter password:'
|
|
|
|
])
|
|
|
|
result += str(exc_do.before)
|
|
|
|
if index == 0 or index ==1:
|
|
|
|
return result
|
|
|
|
elif index==2:#针对要输入密码的情况,暂时智能输入个空字符
|
|
|
|
result += "Enter password:\n"
|
|
|
|
exc_do.sendline(strpwsd) # 输入空密码后不知道会有多少种情况,密码不对,密码对
|
|
|
|
index = exc_do.expect([pexpect.TIMEOUT, pexpect.EOF])
|
|
|
|
result += str(exc_do.before)
|
|
|
|
else:
|
|
|
|
print("遇到其他输出!")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
|
|
return f"执行错误: {str(e)}"
|
|
|
|
|
|
|
|
#对于非sh命令调用的工具,自己实现命令执行的内容 --#2025-3-24暂时不使用
|
|
|
|
def execute_instruction(self, instruction_old):
|
|
|
|
ext_params = self.create_extparams()
|
|
|
|
# 第一步:验证指令合法性
|
|
|
|
instruction, timeout = self.validate_instruction(instruction_old)
|
|
|
|
if not instruction:
|
|
|
|
ext_params.is_user = True
|
|
|
|
return False, instruction_old, "该指令暂不执行!由用户确认是否要兼容支持", "", ext_params # 未
|
|
|
|
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
|
|
|
|
|
|
|
|
# 第二步:执行指令
|
|
|
|
output = self.do_worker_pexpect(instruction, timeout, ext_params)
|
|
|
|
|
|
|
|
# 第三步:分析执行结果
|
|
|
|
if isinstance(output, bytes): # 若是bytes则转成str
|
|
|
|
output = output.decode('utf-8', errors='ignore')
|
|
|
|
|
|
|
|
analysis = self.analyze_result(output, instruction, "", "")
|
|
|
|
|
|
|
|
if not analysis: # analysis为“” 不提交LLM
|
|
|
|
ext_params.is_user = True
|
|
|
|
return False, instruction, analysis, output, ext_params
|
|
|
|
return True, instruction, analysis, output, ext_params
|
|
|
|
|
|
|
|
def analyze_result(self, result,instruction,stderr,stdout):
|
|
|
|
#如果有--------,把这些-去除掉
|
|
|
|
if result:
|
|
|
|
result = result.replace("--------------","")
|
|
|
|
else:
|
|
|
|
result = ""
|
|
|
|
return result
|