''' 对目标资产的管理,包括信息的更新维护等 ''' import re import socket import ipaddress import geoip2.database import ipwhois import requests import whois import dns.resolver import ssl import random from urllib.parse import urlparse from datetime import datetime #pattern = r'^(https?://)?((?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}|(?:\d{1,3}\.){3}\d{1,3})(:\d+)?(/.*)?$' pattern = r'^(https?://)?((?:[0-9]{1,3}\.){3}[0-9]{1,3}|(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,})(:\d+)?(/.*)?$' class TargetManager: def __init__(self): pass def extract_and_store_ips(self,str_target: str): # 正则匹配IP地址(包含IPv4、IPv6及带端口的情况) ip_pattern = r''' (?P\[?([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\]?| # 完整IPv6 ::([0-9a-fA-F]{1,4}:){0,6}[0-9a-fA-F]{1,4}| # 缩写IPv6 (?P(\d{1,3}\.){3}\d{1,3})(?::\d+)? # IPv4及端口 ''' candidates = re.finditer(ip_pattern, str_target, re.VERBOSE) valid_ips = [] for match in candidates: raw_ip = match.group().lstrip('[').rstrip(']') # 处理IPv6方括号 # 分离IP和端口(如192.168.1.1:8080) if ':' in raw_ip and not raw_ip.count(':') > 1: # 排除IPv6的冒号 ip_part = raw_ip.split(':')[0] else: ip_part = raw_ip # 验证IP有效性并分类 try: ip_obj = ipaddress.ip_address(ip_part) ip_type = 'v6' if ip_obj.version == 6 else 'v4' valid_ips.append({ 'binary_ip': ip_obj.packed, 'ip_type': ip_type, 'original': ip_part }) except ValueError: continue def get_fake_target(self,type): if type ==1: fake_target = "192.168.3." + str(random.randint(2, 254)) else: fake_target = "czzfkj" + chr(random.randint(97, 122)) + chr(random.randint(97, 122)) return fake_target #验证目标格式的合法性,并提取域名或IP def validate_and_extract(self,input_str): ''' :param input_str: :return: bool,real_target,int(1-IP,2-domain),fake_target ''' type = None fake_target = "" real_target = "" target_type,target = self.is_valid_target(input_str) if not target_type: #非法目标 return False,input_str,type,fake_target if target_type =="IPv4" or target_type=="IPv6": type = 1 #IP elif target_type == "URL": type = 2 #domain else: #目标不合法 return False,real_target,type,fake_target real_target = target fake_target = self.get_fake_target(type) return True,real_target,type,fake_target #验证目标是否合法 def is_valid_target(self,target): ''' 检查目标的合法性,并对于URL地址,提取域名部分,若是ip的URL,提取IP :param target: :return: target_type new_target ''' # Check if target is a valid IP address (IPv4 or IPv6) try: ip = ipaddress.ip_address(target) if ip.version == 4: return 'IPv4',target elif ip.version == 6: return 'IPv6',target except ValueError: pass # Check if target is a valid URL # 检查是否为有效的 URL try: # 解析 URL result = urlparse(target) # 确保 URL 具有协议(http 或 https) if not result.scheme or result.scheme not in ['http', 'https']: # 如果没有协议,尝试添加 'http://' 并重新解析 result = urlparse('http://' + target) if not result.netloc: return None, None else: # 如果有协议,确保 netloc 不为空 if not result.netloc: return None, None netloc = result.netloc # 处理 URL 中的 IPv6 地址(用方括号括起来) if netloc.startswith('[') and netloc.endswith(']'): ip_str = netloc[1:-1] try: ipaddress.IPv6Address(ip_str) return 'IPv6', ip_str except ValueError: return None, None # 处理可能的 IPv4 地址 elif self._is_valid_ipv4(netloc): try: ipaddress.IPv4Address(netloc) return 'IPv4', netloc except ValueError: return None, None # 检查 netloc 是否为有效的域名 elif self._is_valid_domain(netloc): return 'URL', netloc else: return None, None except ValueError: return None, None def _is_valid_ipv4(self, ip): ''' 检查字符串是否为有效的 IPv4 地址格式 :param ip: 输入的字符串 :return: True 如果是有效的 IPv4 地址,否则 False ''' parts = ip.split('.') if len(parts) != 4: return False for part in parts: if not part.isdigit() or not 0 <= int(part) <= 255: return False return True def _is_valid_domain(self, domain): ''' 检查字符串是否为有效的域名格式 :param domain: 输入的字符串 :return: True 如果是有效的域名,否则 False ''' # 使用正则表达式验证域名格式 domain_regex = re.compile( r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' ) return bool(domain_regex.match(domain)) def collect_ip_info(self,ip): info = {} try: # 首先尝试 RDAP 查询 obj = ipwhois.IPWhois(ip) whois_info = obj.lookup_rdap() info['asn'] = whois_info.get('asn') # 获取 ASN info['isp'] = whois_info.get('network', {}).get('name') # 获取 ISP except (ipwhois.exceptions.IPDefinedError, ipwhois.exceptions.ASNRegistryError, requests.exceptions.RequestException) as e: # 如果 RDAP 失败,回退到 WHOIS 查询 try: whois_info = obj.lookup_whois() info['asn'] = whois_info.get('asn') # 获取 ASN if whois_info.get('nets'): # 从 WHOIS 的 'nets' 中提取 ISP(通常在 description 字段) info['isp'] = whois_info['nets'][0].get('description') except Exception as e: info['whois_error'] = str(e) # 记录错误信息 return info def collect_domain_info(self,domain): info = {} try: w = whois.whois(domain) info['registrar'] = w.registrar # 处理 creation_date if isinstance(w.creation_date, list): info['creation_date'] = [dt.strftime('%Y-%m-%d %H:%M:%S') if isinstance(dt, datetime) else str(dt) for dt in w.creation_date] elif isinstance(w.creation_date, datetime): info['creation_date'] = w.creation_date.strftime('%Y-%m-%d %H:%M:%S') else: info['creation_date'] = str(w.creation_date) # 处理 expiration_date if isinstance(w.expiration_date, list): info['expiration_date'] = [dt.strftime('%Y-%m-%d %H:%M:%S') if isinstance(dt, datetime) else str(dt) for dt in w.expiration_date] elif isinstance(w.expiration_date, datetime): info['expiration_date'] = w.expiration_date.strftime('%Y-%m-%d %H:%M:%S') else: info['expiration_date'] = str(w.expiration_date) info['user_name'] = str(w.name) info['emails'] = str(w.emails) info['status'] = str(w.status) except Exception as e: info['whois_error'] = str(e) try: answers = dns.resolver.resolve(domain, 'A') info['A_records'] = [r.to_text() for r in answers] except Exception as e: info['dns_error'] = str(e) return info def test(self,str_target): bok, target, type, fake_target = self.validate_and_extract(str_target) if not bok: print(f"{str_target}目标不合法{target}") else: print(f"{str_target}目标合法{target} ---- {fake_target}") g_TM = TargetManager() if __name__ == "__main__": #tm = TargetManager() #示例测试 # test_cases = [ # "256.254.1111.23", # "8.8.8.8", # "2001:db8::1", # "http://www.crnn.cc/", # "https://www.crnn.cn", # "http://www.crnn.cc/product_category/network-security-services", # "192.168.1.1:80", # "example.com/path/to/resource", # "www.crnn.cn", # "oa.crnn.cn", # "ftp://invalid.com", # 不合规 # "http://300.400.500.600" # 不合规 # ] test_cases = [ "4545.234", "http://www.crnn.cc/product_category/network-security-services", "not a url", "192.15.2.3", "12314.5123.45123" ] #tm.test("https://www.crnn.cn") for case in test_cases: g_TM.test(case)