You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

261 lines
9.2 KiB

'''
对目标资产的管理,包括信息的更新维护等
'''
import re
import socket
import ipaddress
import geoip2.database
import ipwhois
import requests
import whois
import dns.resolver
import ssl
from urllib.parse import urlparse
from datetime import datetime
#pattern = r'^(https?://)?((?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}|(?:\d{1,3}\.){3}\d{1,3})(:\d+)?(/.*)?$'
pattern = r'^(https?://)?((?:[0-9]{1,3}\.){3}[0-9]{1,3}|(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,})(:\d+)?(/.*)?$'
class TargetManager:
def __init__(self):
pass
def extract_and_store_ips(self,str_target: str):
# 正则匹配IP地址(包含IPv4、IPv6及带端口的情况)
ip_pattern = r'''
(?P<ipv6>\[?([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\]?| # 完整IPv6
::([0-9a-fA-F]{1,4}:){0,6}[0-9a-fA-F]{1,4}| # 缩写IPv6
(?P<ipv4>(\d{1,3}\.){3}\d{1,3})(?::\d+)? # IPv4及端口
'''
candidates = re.finditer(ip_pattern, str_target, re.VERBOSE)
valid_ips = []
for match in candidates:
raw_ip = match.group().lstrip('[').rstrip(']') # 处理IPv6方括号
# 分离IP和端口(如192.168.1.1:8080)
if ':' in raw_ip and not raw_ip.count(':') > 1: # 排除IPv6的冒号
ip_part = raw_ip.split(':')[0]
else:
ip_part = raw_ip
# 验证IP有效性并分类
try:
ip_obj = ipaddress.ip_address(ip_part)
ip_type = 'v6' if ip_obj.version == 6 else 'v4'
valid_ips.append({
'binary_ip': ip_obj.packed,
'ip_type': ip_type,
'original': ip_part
})
except ValueError:
continue
#验证目标格式的合法性,并提取域名或IP
def validate_and_extract(self,input_str):
'''
:param input_str:
:return: bool,real_target,int(1-IP,2-domain),fake_target
'''
type = None
fake_target = ""
real_target = ""
target_type,target = self.is_valid_target(input_str)
if not target_type: #非法目标
return False,input_str,type,fake_target
if target_type =="IPv4" or target_type=="IPv6":
type = 1 #IP
real_target = target
fake_target = "192.168.3.107"
elif target_type == "URL":
type = 2 #domain
real_target = target
fake_target = "czzfkjxx"
else: #目标不合法
return False,real_target,type,fake_target
return True,real_target,type,fake_target
#验证目标是否合法
def is_valid_target(self,target):
'''
检查目标的合法性,并对于URL地址,提取域名部分,若是ip的URL,提取IP
:param target:
:return: target_type new_target
'''
# Check if target is a valid IP address (IPv4 or IPv6)
try:
ip = ipaddress.ip_address(target)
if ip.version == 4:
return 'IPv4',target
elif ip.version == 6:
return 'IPv6',target
except ValueError:
pass
# Check if target is a valid URL
# 检查是否为有效的 URL
try:
# 解析 URL
result = urlparse(target)
# 确保 URL 具有协议(http 或 https)
if not result.scheme or result.scheme not in ['http', 'https']:
# 如果没有协议,尝试添加 'http://' 并重新解析
result = urlparse('http://' + target)
if not result.netloc:
return None, None
else:
# 如果有协议,确保 netloc 不为空
if not result.netloc:
return None, None
netloc = result.netloc
# 处理 URL 中的 IPv6 地址(用方括号括起来)
if netloc.startswith('[') and netloc.endswith(']'):
ip_str = netloc[1:-1]
try:
ipaddress.IPv6Address(ip_str)
return 'IPv6', ip_str
except ValueError:
return None, None
# 处理可能的 IPv4 地址
elif self._is_valid_ipv4(netloc):
try:
ipaddress.IPv4Address(netloc)
return 'IPv4', netloc
except ValueError:
return None, None
# 检查 netloc 是否为有效的域名
elif self._is_valid_domain(netloc):
return 'URL', netloc
else:
return None, None
except ValueError:
return None, None
def _is_valid_ipv4(self, ip):
'''
检查字符串是否为有效的 IPv4 地址格式
:param ip: 输入的字符串
:return: True 如果是有效的 IPv4 地址,否则 False
'''
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit() or not 0 <= int(part) <= 255:
return False
return True
def _is_valid_domain(self, domain):
'''
检查字符串是否为有效的域名格式
:param domain: 输入的字符串
:return: True 如果是有效的域名,否则 False
'''
# 使用正则表达式验证域名格式
domain_regex = re.compile(
r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
)
return bool(domain_regex.match(domain))
def collect_ip_info(self,ip):
info = {}
try:
# 首先尝试 RDAP 查询
obj = ipwhois.IPWhois(ip)
whois_info = obj.lookup_rdap()
info['asn'] = whois_info.get('asn') # 获取 ASN
info['isp'] = whois_info.get('network', {}).get('name') # 获取 ISP
except (ipwhois.exceptions.IPDefinedError, ipwhois.exceptions.ASNRegistryError,
requests.exceptions.RequestException) as e:
# 如果 RDAP 失败,回退到 WHOIS 查询
try:
whois_info = obj.lookup_whois()
info['asn'] = whois_info.get('asn') # 获取 ASN
if whois_info.get('nets'):
# 从 WHOIS 的 'nets' 中提取 ISP(通常在 description 字段)
info['isp'] = whois_info['nets'][0].get('description')
except Exception as e:
info['whois_error'] = str(e) # 记录错误信息
return info
def collect_domain_info(self,domain):
info = {}
try:
w = whois.whois(domain)
info['registrar'] = w.registrar
# 处理 creation_date
if isinstance(w.creation_date, list):
info['creation_date'] = [dt.strftime('%Y-%m-%d %H:%M:%S') if isinstance(dt, datetime) else str(dt) for
dt in w.creation_date]
elif isinstance(w.creation_date, datetime):
info['creation_date'] = w.creation_date.strftime('%Y-%m-%d %H:%M:%S')
else:
info['creation_date'] = str(w.creation_date)
# 处理 expiration_date
if isinstance(w.expiration_date, list):
info['expiration_date'] = [dt.strftime('%Y-%m-%d %H:%M:%S') if isinstance(dt, datetime) else str(dt) for
dt in w.expiration_date]
elif isinstance(w.expiration_date, datetime):
info['expiration_date'] = w.expiration_date.strftime('%Y-%m-%d %H:%M:%S')
else:
info['expiration_date'] = str(w.expiration_date)
info['user_name'] = str(w.name)
info['emails'] = str(w.emails)
info['status'] = str(w.status)
except Exception as e:
info['whois_error'] = str(e)
try:
answers = dns.resolver.resolve(domain, 'A')
info['A_records'] = [r.to_text() for r in answers]
except Exception as e:
info['dns_error'] = str(e)
return info
def test(self,str_target):
bok, target, type, fake_target = self.validate_and_extract(str_target)
if not bok:
print(f"{str_target}目标不合法{target}")
else:
print(f"{str_target}目标合法{target} ---- {fake_target}")
g_TM = TargetManager()
if __name__ == "__main__":
#tm = TargetManager()
#示例测试
# test_cases = [
# "256.254.1111.23",
# "8.8.8.8",
# "2001:db8::1",
# "http://www.crnn.cc/",
# "https://www.crnn.cn",
# "http://www.crnn.cc/product_category/network-security-services",
# "192.168.1.1:80",
# "example.com/path/to/resource",
# "www.crnn.cn",
# "oa.crnn.cn",
# "ftp://invalid.com", # 不合规
# "http://300.400.500.600" # 不合规
# ]
test_cases = [
"4545.234",
"http://www.crnn.cc/product_category/network-security-services",
"not a url",
"192.15.2.3",
"12314.5123.45123"
]
#tm.test("https://www.crnn.cn")
for case in test_cases:
g_TM.test(case)