You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

345 lines
12 KiB

#python代码动态执行
import queue
import ast
import subprocess
import json
import builtins
import re
import os
import paramiko
import impacket
import psycopg2
import socket
import struct
import sys
import requests
import ssl
import mysql.connector
import telnetlib
import time
import uuid
import base64
import itertools
import random
import tempfile
import textwrap
import smb
import pexpect
import smbclient
import binascii
from mysql.connector import Error
from impacket.smbconnection import SMBConnection
from itertools import product
from socket import create_connection
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from pymetasploit3.msfrpc import MsfRpcClient
from time import sleep
from base64 import b64encode
from datetime import datetime,timedelta
from mycode.Result_merge import my_merge
from ftplib import FTP
from requests.auth import HTTPBasicAuth
from urllib.parse import quote
from myutils.ReturnParams import ReturnParams
from concurrent.futures import ProcessPoolExecutor, TimeoutError
# --------------------------------------------
# 1) 全局 helper:放在模块顶层,才能被子进程 picklable 调用
# --------------------------------------------
def _execute_dynamic(instruction_str):
"""
在子进程中执行 instruction_str 所描述的 dynamic_fun,
并返回 (status: bool, output: str)。
"""
# 允许的内置函数白名单
allowed_builtins = {
'__name__': __name__,
'__import__': builtins.__import__,
'abs': abs, 'all': all, 'any': any, 'bool': bool,
'chr': chr, 'dict': dict, 'enumerate': enumerate,
'float': float, 'int': int, 'len': len, 'list': list,
'max': max, 'min': min, 'print': print, 'range': range,
'set': set, 'str': str, 'sum': sum, 'type': type,
'open': open, 'Exception': Exception, 'locals': locals,
'ConnectionResetError':ConnectionResetError,'BrokenPipeError':BrokenPipeError,
'bytes':bytes,'tuple':tuple,'format':format
}
# 构造安全的 globals
safe_globals = {
'__builtins__': allowed_builtins,
'subprocess': subprocess,
'json': json,
're': re,
'paramiko': paramiko,
'impacket': impacket,
'psycopg2': psycopg2,
'socket': socket,
'mysql': mysql,
'mysql.connector': mysql.connector,
'struct': struct,
'sys': sys,
'requests': requests,
'ssl': ssl,
'FTP': FTP,
'HTTPBasicAuth': HTTPBasicAuth,
'telnetlib': telnetlib,
'time': time,
'uuid': uuid,
'quote': quote,
'base64': base64,
'itertools': itertools,
'random':random,
'tempfile':tempfile,
'os':os,
'datetime':datetime,
'timedelta':timedelta,
'b64encode':b64encode,
'smb':smb,
'pexpect':pexpect,
'sleep':sleep,
'MsfRpcClient':MsfRpcClient,
'x509':x509,
'default_backend':default_backend,
'product':product,
'create_connection':create_connection,
'smbclient':smbclient,
'binascii':binascii,
'Error':Error,
'SMBConnection':SMBConnection
}
safe_locals = {}
try:
# 编译并执行用户提供的 code 字符串
compiled = compile(instruction_str, '<dynamic>', 'exec')
exec(compiled, safe_globals, safe_locals)
# dynamic_fun 必须存在
if 'dynamic_fun' not in safe_locals:
return False, "Function dynamic_fun() 未定义"
# 调用它并返回结果
res = safe_locals['dynamic_fun']()
if not (isinstance(res, tuple) and len(res) == 2 and isinstance(res[0], bool)):
str_res = str(res)
return False, f"dynamic_fun 返回值格式不对:{str_res}"
return res
except MemoryError:
return False, "内存溢出"
except RecursionError:
return False, "递归深度过深"
except Exception as e:
return False, f"子进程执行出错: {e}"
class PythoncodeTool():
def __init__(self,max_num):
self.max_workers = max_num
self.proc_pool = None
self.pool_active = False
#self.proc_pool = ProcessPoolExecutor(max_workers=max_num)
def start_pool(self):
if self.proc_pool is not None and self.pool_active:
print("进程池已经在运行中")
return False
print("启动进程池...")
self.proc_pool = ProcessPoolExecutor(max_workers=self.max_workers)
self.pool_active = True
return True
def shutdown_pool(self):
if self.proc_pool is not None and self.pool_active:
print("关闭进程池...")
self.proc_pool.shutdown(wait=False) #wait=True 是阻塞执行,False立即返回
self.pool_active = False
self.proc_pool = None
else:
print("进程池已经是关闭状态")
def fix_code(self,code: str) -> str:
"""
将 code 中所有单引号或双引号字符串里意外的真实换行符替换为 '\\n'
使代码可被 ast.parse 正确解析。
"""
# 匹配单引号字符串中跨行的情况
single_pat = re.compile(
r"(')" # 起始单引号
r"([^']*?)" # 引号内部,尽可能少地匹配除单引号外的字符
r"\n" # 真正的换行
r"(.*?)" # 换行之后继续匹配内部字符
r"(\1)", # 结尾单引号,与起始保持一致
flags=re.DOTALL
)
# 匹配双引号字符串中跨行的情况
double_pat = re.compile(
r'(")' # 起始双引号
r'([^"]*?)' # 内部内容
r'\n' # 真实换行
r'(.*?)' # 后续内容
r'(\1)', # 结尾双引号
flags=re.DOTALL
)
prev = None
# 反复替换,直到不再有跨行字符串
while code != prev:
prev = code
# 处理单引号内的真实换行
code = single_pat.sub(
lambda m: (
m.group(1)
+ m.group(2).replace('\n', '\\n')
+ '\\n'
+ m.group(3)
+ m.group(4)
),
code
)
# 处理双引号内的真实换行
code = double_pat.sub(
lambda m: (
m.group(1)
+ m.group(2).replace('\n', '\\n')
+ '\\n'
+ m.group(3)
+ m.group(4)
),
code
)
return code
def preprocess(self,code: str) -> str:
# 去掉最外层空行
code = code.strip('\n')
# 去除多余缩进
return textwrap.dedent(code)
def is_safe_code(self,code):
# List of high-risk functions to block (can be adjusted based on requirements)
# 只屏蔽这些“完整”函数调用
HIGH_RISK = {
'eval', # eval(...)
'exec', # exec(...)
'subprocess.call', # subprocess.call(...)
}
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
fn = node.func
# 1) 裸 exec/eval
if isinstance(fn, ast.Name):
if fn.id in ('exec', 'eval'):
return False,"有高风险函数,暂不执行!"
# 2) 模块级别的 os.system、subprocess.call、subprocess.Popen
elif isinstance(fn, ast.Attribute):
# value 必须是 Name,才算"模块.方法"
if isinstance(fn.value, ast.Name):
fullname = f"{fn.value.id}.{fn.attr}"
if fullname in HIGH_RISK:
return False,"有高风险函数,暂不执行!"
return True,""
except SyntaxError as se:
# 语法都不通过,也算不安全
print("解析失败!", se, "", se.lineno, "")
print("出错的那行是:", code.splitlines()[se.lineno - 1])
return False,str(se)
def validate_instruction(self, instruction):
#指令过滤
timeout = 60*15
instr = instruction.replace("python_code ","")
instr = instr.replace("python-code ", "")
#instr = self.preprocess(instr)
#instr = self.fix_code(instr)
# Safety check
bsafe,error = self.is_safe_code((instr))
if not bsafe:
return "", timeout,error
return instr,timeout,""
def safe_import(self,name,*args,**kwargs):
ALLOWED_MODULES = ['subprocess', 'json','re']
if name not in ALLOWED_MODULES:
raise ImportError(f"Import of '{name}' is not allowed")
return builtins.__import__(name, *args, **kwargs)
def _run_dynamic(self, safe_locals, q):
"""子进程执行 dynamic_fun 并把结果放入队列"""
try:
fn = safe_locals['dynamic_fun']
res = fn()
q.put(res)
except Exception as e:
q.put((False, f"执行出错: {e}"))
def execute_instruction(self, instruction_old):
'''
执行指令:验证合法性 -> 执行 -> 分析结果
:param instruction_old:
:return:
bool:true-正常返回给大模型,false-结果不返回给大模型
str:执行的指令
str:执行指令的结果
'''
ext_params = ReturnParams()
ext_params["is_user"] = False # 是否要提交用户确认 -- 默认False
ext_params["is_vulnerability"] = False # 是否是脆弱点
if not self.pool_active:
return False,instruction_old,"本地程序出行错误,请结束该节点的测试!","",ext_params
# 第一步:验证指令合法性
instruction,time_out,error = self.validate_instruction(instruction_old)
if not instruction:
return True, instruction_old, error,"",ext_params
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
# 第二步:执行指令
future = self.proc_pool.submit(_execute_dynamic, instruction)
try:
# 在主进程中等待结果,超时则抛 TimeoutError
status, tmpout = future.result(timeout=time_out) #这里是阻塞的
except TimeoutError:
# 超时处理
future.cancel()
status, tmpout = False, f"执行超时({time_out} 秒)"
except Exception as e:
# 其他异常
status, tmpout = False, f"提交子进程运行出错: {e}"
output = f"status:{status},output:{tmpout}"
# 第三步:分析执行结果
analysis = self.analyze_result(output, instruction,"","")
return True, instruction, analysis,"",ext_params
def analyze_result(self, result,instruction,stderr,stdout):
#指令结果分析 --- 要不要限定一个max_len?
if "enum4linux " in instruction: #存在指令包装成Python代码返回的情况
result = my_merge("enum4linux",result)
else:
if len(result) > 3000: #超过2000长度时,尝试去重重复行
lines = result.splitlines()
seen = set()
unique_lines = []
for line in lines:
if line not in seen:
seen.add(line)
unique_lines.append(line)
return "\n".join(unique_lines)
return result
#关闭进程池
def shutdown(self):
self.proc_pool.shutdown(wait=True)
if __name__ == "__main__":
llm_code = """
def run_test():
return 'Penetration test executed successfully!'
"""