You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

230 lines
8.1 KiB

'''
对目标资产的管理,包括信息的更新维护等
'''
import re
import socket
import ipaddress
import geoip2.database
import ipwhois
import requests
import whois
import dns.resolver
import ssl
from urllib.parse import urlparse
from datetime import datetime
#pattern = r'^(https?://)?((?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}|(?:\d{1,3}\.){3}\d{1,3})(:\d+)?(/.*)?$'
pattern = r'^(https?://)?((?:[0-9]{1,3}\.){3}[0-9]{1,3}|(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,})(:\d+)?(/.*)?$'
class TargetManager:
def __init__(self):
pass
def extract_and_store_ips(self,str_target: str):
# 正则匹配IP地址(包含IPv4、IPv6及带端口的情况)
ip_pattern = r'''
(?P<ipv6>\[?([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\]?| # 完整IPv6
::([0-9a-fA-F]{1,4}:){0,6}[0-9a-fA-F]{1,4}| # 缩写IPv6
(?P<ipv4>(\d{1,3}\.){3}\d{1,3})(?::\d+)? # IPv4及端口
'''
candidates = re.finditer(ip_pattern, str_target, re.VERBOSE)
valid_ips = []
for match in candidates:
raw_ip = match.group().lstrip('[').rstrip(']') # 处理IPv6方括号
# 分离IP和端口(如192.168.1.1:8080)
if ':' in raw_ip and not raw_ip.count(':') > 1: # 排除IPv6的冒号
ip_part = raw_ip.split(':')[0]
else:
ip_part = raw_ip
# 验证IP有效性并分类
try:
ip_obj = ipaddress.ip_address(ip_part)
ip_type = 'v6' if ip_obj.version == 6 else 'v4'
valid_ips.append({
'binary_ip': ip_obj.packed,
'ip_type': ip_type,
'original': ip_part
})
except ValueError:
continue
# 辅助函数:验证IPv4地址的有效性
def _is_valid_ipv4(self,ip):
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
return True
#验证目标格式的合法性,并提取域名或IP
def validate_and_extract(self,input_str):
'''
:param input_str:
:return: bool,str,int(1-IP,2-domain)
'''
regex_match = re.fullmatch(pattern, input_str)
type = None
fake_target = ""
if regex_match:
domain_or_ip = regex_match.group(2)
# 仅对 IPv4 格式的字符串进行有效性验证
if re.fullmatch(r'\d{1,3}(\.\d{1,3}){3}', domain_or_ip):
if not self._is_valid_ipv4(domain_or_ip):
return False, None,type,fake_target
else:
type = 1 #IP
fake_target = "192.168.3.107"
else:
type = 2 #domain
fake_target = "www.czzfxxkj.com"
return True, domain_or_ip,type,fake_target
else:
return False, None,type,fake_target
#验证目标是否合法
def is_valid_target(self,target):
# Check if target is a valid IP address (IPv4 or IPv6)
try:
ip = ipaddress.ip_address(target)
if ip.version == 4:
return 'IPv4'
elif ip.version == 6:
return 'IPv6'
except ValueError:
pass
# Check if target is a valid URL
try:
result = urlparse(target)
# Only allow http or https schemes
if result.scheme not in ['http', 'https']:
return None
netloc = result.netloc
if not netloc:
return None
# Handle IPv6 addresses in URLs (enclosed in brackets)
if netloc.startswith('[') and netloc.endswith(']'):
ip_str = netloc[1:-1]
try:
ipaddress.IPv6Address(ip_str)
return 'URL'
except ValueError:
return None
# Handle potential IPv4 addresses
elif self._is_valid_ipv4(netloc):
try:
ipaddress.IPv4Address(netloc)
return 'URL'
except ValueError:
return None
# If not an IP-like string, assume it's a domain name and accept
return 'URL'
except ValueError:
return None
def collect_ip_info(self,ip):
info = {}
try:
# 首先尝试 RDAP 查询
obj = ipwhois.IPWhois(ip)
whois_info = obj.lookup_rdap()
info['asn'] = whois_info.get('asn') # 获取 ASN
info['isp'] = whois_info.get('network', {}).get('name') # 获取 ISP
except (ipwhois.exceptions.IPDefinedError, ipwhois.exceptions.ASNRegistryError,
requests.exceptions.RequestException) as e:
# 如果 RDAP 失败,回退到 WHOIS 查询
try:
whois_info = obj.lookup_whois()
info['asn'] = whois_info.get('asn') # 获取 ASN
if whois_info.get('nets'):
# 从 WHOIS 的 'nets' 中提取 ISP(通常在 description 字段)
info['isp'] = whois_info['nets'][0].get('description')
except Exception as e:
info['whois_error'] = str(e) # 记录错误信息
return info
def collect_domain_info(self,domain):
info = {}
try:
w = whois.whois(domain)
info['registrar'] = w.registrar
# 处理 creation_date
if isinstance(w.creation_date, list):
info['creation_date'] = [dt.strftime('%Y-%m-%d %H:%M:%S') if isinstance(dt, datetime) else str(dt) for
dt in w.creation_date]
elif isinstance(w.creation_date, datetime):
info['creation_date'] = w.creation_date.strftime('%Y-%m-%d %H:%M:%S')
else:
info['creation_date'] = str(w.creation_date)
# 处理 expiration_date
if isinstance(w.expiration_date, list):
info['expiration_date'] = [dt.strftime('%Y-%m-%d %H:%M:%S') if isinstance(dt, datetime) else str(dt) for
dt in w.expiration_date]
elif isinstance(w.expiration_date, datetime):
info['expiration_date'] = w.expiration_date.strftime('%Y-%m-%d %H:%M:%S')
else:
info['expiration_date'] = str(w.expiration_date)
info['user_name'] = str(w.name)
info['emails'] = str(w.emails)
info['status'] = str(w.status)
except Exception as e:
info['whois_error'] = str(e)
try:
answers = dns.resolver.resolve(domain, 'A')
info['A_records'] = [r.to_text() for r in answers]
except Exception as e:
info['dns_error'] = str(e)
return info
def test(self,str_target):
target_type = self.is_valid_target(str_target)
if not target_type:
print(f"Invalid target: {str_target}")
return
if target_type == 'IPv4' or target_type == "IPv6":
#info = self.collect_ip_info(str_target)
info = "IP"
elif target_type == 'URL':
domain = urlparse(str_target).netloc
info = self.collect_domain_info(domain)
print(f"Collected info for {str_target}: {info}")
g_TM = TargetManager()
if __name__ == "__main__":
tm = TargetManager()
# 示例测试
# test_cases = [
# "256.254.1111.23",
# "8.8.8.8",
# "2001:db8::1",
# "http://www.crnn.cc/",
# "https://www.crnn.cn",
# "http://www.crnn.cc/product_category/network-security-services",
# "192.168.1.1:80",
# "example.com/path/to/resource",
# "ftp://invalid.com", # 不合规
# "http://300.400.500.600" # 不合规
# ]
test_cases = [
"http://www.crnn.cc/",
"http://www.crnn.cc/product_category/network-security-services"
]
#tm.test("https://www.crnn.cn")
for case in test_cases:
tm.test(case)