""" Match domains from registrar DNS exports to S3 buckets Analyzes DNS records to find domains pointing to: - Direct S3 website endpoints (*.s3-website-*.amazonaws.com) - CloudFront distributions (which may front S3 buckets) - S3 bucket names that match domain names """ import csv import re import sys import os from typing import Dict, List, Set, Optional, Tuple from collections import defaultdict import boto3 import click from botocore.exceptions import ClientError, NoCredentialsError from dotenv import load_dotenv # Load .env file load_dotenv() def get_s3_buckets() -> List[Dict[str, str]]: """Get all S3 buckets with their regions""" try: s3_client = boto3.client('s3') response = s3_client.list_buckets() buckets = [] for bucket in response.get('Buckets', []): bucket_name = bucket['Name'] # Get bucket region try: region_response = s3_client.get_bucket_location(Bucket=bucket_name) region = region_response.get('LocationConstraint', 'us-east-1') if region is None or region == '': region = 'us-east-1' except ClientError: region = 'us-east-1' buckets.append({ 'name': bucket_name, 'region': region }) return buckets except NoCredentialsError: click.echo("Warning: AWS credentials not found. Cannot list S3 buckets.", err=True) click.echo("Will only match based on DNS records and name matching.", err=True) return [] except Exception as e: click.echo(f"Warning: Error listing buckets: {e}", err=True) click.echo("Will only match based on DNS records and name matching.", err=True) return [] def get_cloudfront_distributions() -> Dict[str, Dict]: """Get all CloudFront distributions and their origins""" try: cf_client = boto3.client('cloudfront') distributions = {} paginator = cf_client.get_paginator('list_distributions') for page in paginator.paginate(): for dist in page.get('DistributionList', {}).get('Items', []): dist_id = dist['Id'] domain_name = dist['DomainName'] # Get origin (S3 bucket if applicable) origins = dist.get('Origins', {}).get('Items', []) s3_origins = [] for origin in origins: domain = origin.get('DomainName', '') if '.s3.' in domain or 's3-website' in domain: # Extract bucket name from S3 origin bucket_match = re.search(r'([^/]+)\.s3[\.-]', domain) if bucket_match: s3_origins.append(bucket_match.group(1)) distributions[domain_name] = { 'id': dist_id, 's3_buckets': s3_origins, 'aliases': dist.get('Aliases', {}).get('Items', []) } return distributions except Exception as e: click.echo(f"Warning: Could not list CloudFront distributions: {e}", err=True) return {} def parse_dns_csv(csv_path: str) -> Dict[str, List[Dict]]: """ Parse DNS records CSV and extract domains with S3/CloudFront records Returns: Dictionary mapping domain -> list of relevant DNS records """ domains = defaultdict(list) with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: domain = row.get('Domain', '').strip() record_type = row.get('Record Type', '').strip() details = row.get('Details', '').strip() record = row.get('Record', '').strip() if not domain: continue # Check for S3 website endpoints if 's3-website' in details.lower() or 's3.amazonaws.com' in details.lower(): domains[domain].append({ 'type': 'S3_DIRECT', 'record_type': record_type, 'record': record, 'target': details, 'domain': domain }) # Check for CloudFront if 'cloudfront.net' in details.lower(): domains[domain].append({ 'type': 'CLOUDFRONT', 'record_type': record_type, 'record': record, 'target': details, 'domain': domain }) # Check for ACM validation (indicates AWS usage) if 'acm-validations.aws' in details.lower(): domains[domain].append({ 'type': 'ACM_VALIDATION', 'record_type': record_type, 'record': record, 'target': details, 'domain': domain }) return dict(domains) def extract_s3_bucket_from_target(target: str) -> Optional[str]: """Extract S3 bucket name from DNS target""" # Pattern: bucket-name.s3-website-region.amazonaws.com # Bucket name can contain dots, so capture everything up to .s3-website- match = re.search(r'^([^/]+)\.s3-website-([^\.]+)\.amazonaws\.com', target) if match: return match.group(1) # Pattern: www.bucket-name.s3-website-region.amazonaws.com match = re.search(r'^www\.([^/]+)\.s3-website-([^\.]+)\.amazonaws\.com', target) if match: return match.group(1) # Pattern: bucket-name.s3.region.amazonaws.com match = re.search(r'^([^/]+)\.s3\.([^\.]+)\.amazonaws\.com', target) if match: return match.group(1) return None def match_domains_to_buckets( domains: Dict[str, List[Dict]], buckets: List[Dict[str, str]], cloudfront: Dict[str, Dict] ) -> List[Dict]: """ Match domains to S3 buckets Returns: List of matches with domain, bucket, match_type, etc. """ matches = [] bucket_names = {b['name'] for b in buckets} # Create CloudFront domain -> bucket mapping cf_domain_to_bucket = {} for cf_domain, cf_info in cloudfront.items(): for alias in cf_info.get('aliases', []): cf_domain_to_bucket[alias.lower()] = cf_info['s3_buckets'] cf_domain_to_bucket[cf_domain.lower()] = cf_info['s3_buckets'] for domain, records in domains.items(): domain_lower = domain.lower() domain_base = domain_lower.replace('www.', '').replace('@', '') # Check each DNS record for record in records: match_info = { 'domain': domain, 'dns_record': record['record'], 'dns_target': record['target'], 'match_type': None, 'bucket_name': None, 'bucket_region': None, 'confidence': 'low' } if record['type'] == 'S3_DIRECT': # Extract bucket from S3 endpoint bucket_name = extract_s3_bucket_from_target(record['target']) if bucket_name and bucket_name in bucket_names: match_info['bucket_name'] = bucket_name match_info['match_type'] = 'S3_DIRECT_DNS' match_info['confidence'] = 'high' # Find bucket region for bucket in buckets: if bucket['name'] == bucket_name: match_info['bucket_region'] = bucket['region'] break matches.append(match_info) elif record['type'] == 'CLOUDFRONT': # Check if CloudFront points to S3 cf_target = record['target'].lower() if cf_target in cf_domain_to_bucket: s3_buckets = cf_domain_to_bucket[cf_target] if s3_buckets: match_info['bucket_name'] = s3_buckets[0] match_info['match_type'] = 'CLOUDFRONT_TO_S3' match_info['confidence'] = 'high' # Find bucket region for bucket in buckets: if bucket['name'] == match_info['bucket_name']: match_info['bucket_region'] = bucket['region'] break matches.append(match_info) else: # CloudFront but no S3 origin found match_info['match_type'] = 'CLOUDFRONT_UNKNOWN_ORIGIN' match_info['confidence'] = 'medium' matches.append(match_info) # Also check if domain name matches bucket name (name-based match) if domain_base in bucket_names: # Check if we already have a match for this domain existing = [m for m in matches if m['domain'] == domain and m['bucket_name'] == domain_base] if not existing: match_info = { 'domain': domain, 'dns_record': 'N/A', 'dns_target': 'Name match', 'match_type': 'NAME_MATCH', 'bucket_name': domain_base, 'bucket_region': None, 'confidence': 'medium' } # Find bucket region for bucket in buckets: if bucket['name'] == domain_base: match_info['bucket_region'] = bucket['region'] break matches.append(match_info) return matches @click.command() @click.argument('dns_csv', type=click.Path(exists=True)) @click.option('--output', '-o', type=click.Path(), help='Output CSV file for matches') def main(dns_csv: str, output: Optional[str]): """Match domains from DNS CSV to S3 buckets""" click.echo("Matching domains to S3 buckets...") click.echo("=" * 80) # Get S3 buckets click.echo("\n1. Fetching S3 buckets...") buckets = get_s3_buckets() click.echo(f" Found {len(buckets)} S3 buckets") # Get CloudFront distributions click.echo("\n2. Fetching CloudFront distributions...") cloudfront = get_cloudfront_distributions() click.echo(f" Found {len(cloudfront)} CloudFront distributions") # Parse DNS CSV click.echo(f"\n3. Parsing DNS records from {dns_csv}...") domains = parse_dns_csv(dns_csv) click.echo(f" Found {len(domains)} domains with AWS-related DNS records") # Match domains to buckets click.echo("\n4. Matching domains to buckets...") matches = match_domains_to_buckets(domains, buckets, cloudfront) click.echo(f" Found {len(matches)} matches") # Display results click.echo("\n" + "=" * 80) click.echo("MATCH RESULTS") click.echo("=" * 80) if not matches: click.echo("No matches found.") return # Group by match type by_type = defaultdict(list) for match in matches: by_type[match['match_type']].append(match) for match_type, type_matches in by_type.items(): click.echo(f"\n{match_type} ({len(type_matches)} matches):") click.echo("-" * 80) for match in sorted(type_matches, key=lambda x: x['domain']): confidence_icon = "[OK]" if match['confidence'] == 'high' else "[?]" click.echo(f" {confidence_icon} {match['domain']:<40} -> {match['bucket_name'] or 'N/A':<40} " f"({match['bucket_region'] or 'N/A'})") # Save to CSV if requested if output: click.echo(f"\n5. Saving results to {output}...") with open(output, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=[ 'domain', 'bucket_name', 'bucket_region', 'match_type', 'confidence', 'dns_record', 'dns_target' ]) writer.writeheader() for match in matches: writer.writerow(match) click.echo(f" Saved {len(matches)} matches to {output}") click.echo("\n" + "=" * 80) click.echo("Summary:") click.echo(f" Total domains analyzed: {len(domains)}") click.echo(f" Total matches found: {len(matches)}") click.echo(f" High confidence: {len([m for m in matches if m['confidence'] == 'high'])}") click.echo(f" Medium confidence: {len([m for m in matches if m['confidence'] == 'medium'])}") click.echo("=" * 80) if __name__ == "__main__": main()