You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
109 lines
4.2 KiB
109 lines
4.2 KiB
from tools.ToolBase import ToolBase
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.x509 import NameOID
|
|
import re
|
|
import json
|
|
import pexpect
|
|
|
|
class OpensslTool(ToolBase):
|
|
def validate_instruction(self, instruction):
|
|
#指令过滤
|
|
timeout = 60*5
|
|
return instruction,timeout
|
|
|
|
def do_worker_pexpect(self, str_instruction, timeout, ext_params):
|
|
try:
|
|
result = ""
|
|
exc_do = pexpect.spawn('bash', ['-c', str_instruction], timeout=timeout,
|
|
encoding='utf-8') # spawn 第一个参数是可执行文件
|
|
index = exc_do.expect([
|
|
pexpect.TIMEOUT,
|
|
pexpect.EOF
|
|
])
|
|
result += str(exc_do.before)
|
|
if index == 0:
|
|
result += f"\n执行超时{timeout}秒"
|
|
elif index == 1:
|
|
pass
|
|
else:
|
|
print("遇到其他输出!")
|
|
pass
|
|
return result
|
|
except Exception as e:
|
|
return f"执行错误: {str(e)}"
|
|
|
|
def execute_instruction(self, instruction_old):
|
|
ext_params = self.create_extparams()
|
|
# 第一步:验证指令合法性
|
|
instruction,time_out = self.validate_instruction(instruction_old)
|
|
if not instruction:
|
|
return False, instruction_old, "该指令暂不执行!","",ext_params
|
|
# 过滤修改后的指令是否需要判重?同样指令再执行结果一致?待定---#?
|
|
|
|
# 第二步:执行指令---需要对ftp指令进行区分判断
|
|
#output = self.do_worker_script(instruction, time_out, ext_params)
|
|
output = self.do_worker_pexpect(instruction, time_out, ext_params)
|
|
|
|
# 第三步:分析执行结果
|
|
analysis = self.analyze_result(output,instruction,"","")
|
|
|
|
return True, instruction, analysis,output,ext_params
|
|
|
|
|
|
def parse_name(self,name):
|
|
"""解析X509名称对象为结构化字典"""
|
|
return {
|
|
NameOID.COUNTRY_NAME: name.get_attributes_for_oid(NameOID.COUNTRY_NAME),
|
|
NameOID.STATE_OR_PROVINCE_NAME: name.get_attributes_for_oid(NameOID.STATE_OR_PROVINCE_NAME),
|
|
NameOID.LOCALITY_NAME: name.get_attributes_for_oid(NameOID.LOCALITY_NAME),
|
|
NameOID.ORGANIZATION_NAME: name.get_attributes_for_oid(NameOID.ORGANIZATION_NAME),
|
|
NameOID.COMMON_NAME: name.get_attributes_for_oid(NameOID.COMMON_NAME),
|
|
NameOID.ORGANIZATIONAL_UNIT_NAME: name.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME),
|
|
}
|
|
|
|
def parse_ssl_info(self,output):
|
|
# 提取证书内容
|
|
certs = re.findall(
|
|
r'-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----',
|
|
output,
|
|
re.DOTALL
|
|
)
|
|
|
|
results = []
|
|
cert_obj = None
|
|
for cert in certs:
|
|
cert_data = "-----BEGIN CERTIFICATE-----" + cert + "-----END CERTIFICATE-----"
|
|
try:
|
|
cert_obj = x509.load_pem_x509_certificate(cert_data.encode(), default_backend())
|
|
except ValueError as e:
|
|
print(f"证书加载失败:{str(e)}")
|
|
continue
|
|
|
|
san_list = []
|
|
try:
|
|
san_ext = cert_obj.extensions.get_extension_for_class(x509.SubjectAlternativeName)
|
|
san_list = san_ext.value.get_values_for_type(x509.DNSName)
|
|
except x509.ExtensionNotFound:
|
|
pass
|
|
|
|
if cert_obj:
|
|
results.append({
|
|
'subject': str(cert_obj.subject),
|
|
'issuer': str(cert_obj.issuer),
|
|
'san': str(san_list),
|
|
'validity': {
|
|
'start': str(cert_obj.not_valid_before),
|
|
'end': str(cert_obj.not_valid_after)
|
|
},
|
|
'signature_algorithm': str(cert_obj.signature_algorithm_oid._name)
|
|
})
|
|
|
|
return results
|
|
|
|
def analyze_result(self, result,instruction,stderr,stdout):
|
|
#指令结果分析
|
|
if len(result) > 3000:
|
|
result = self.parse_ssl_info(stdout)
|
|
result = json.dumps(result,ensure_ascii=False)
|
|
return result
|