Big-Link-Man/scripts/match_domains_to_s3_buckets.py

345 lines
13 KiB
Python

"""
Match domains from registrar DNS exports to S3 buckets
Analyzes DNS records to find domains pointing to:
- Direct S3 website endpoints (*.s3-website-*.amazonaws.com)
- CloudFront distributions (which may front S3 buckets)
- S3 bucket names that match domain names
"""
import csv
import re
import sys
import os
from typing import Dict, List, Set, Optional, Tuple
from collections import defaultdict
import boto3
import click
from botocore.exceptions import ClientError, NoCredentialsError
from dotenv import load_dotenv
# Load .env file
load_dotenv()
def get_s3_buckets() -> List[Dict[str, str]]:
"""Get all S3 buckets with their regions"""
try:
s3_client = boto3.client('s3')
response = s3_client.list_buckets()
buckets = []
for bucket in response.get('Buckets', []):
bucket_name = bucket['Name']
# Get bucket region
try:
region_response = s3_client.get_bucket_location(Bucket=bucket_name)
region = region_response.get('LocationConstraint', 'us-east-1')
if region is None or region == '':
region = 'us-east-1'
except ClientError:
region = 'us-east-1'
buckets.append({
'name': bucket_name,
'region': region
})
return buckets
except NoCredentialsError:
click.echo("Warning: AWS credentials not found. Cannot list S3 buckets.", err=True)
click.echo("Will only match based on DNS records and name matching.", err=True)
return []
except Exception as e:
click.echo(f"Warning: Error listing buckets: {e}", err=True)
click.echo("Will only match based on DNS records and name matching.", err=True)
return []
def get_cloudfront_distributions() -> Dict[str, Dict]:
"""Get all CloudFront distributions and their origins"""
try:
cf_client = boto3.client('cloudfront')
distributions = {}
paginator = cf_client.get_paginator('list_distributions')
for page in paginator.paginate():
for dist in page.get('DistributionList', {}).get('Items', []):
dist_id = dist['Id']
domain_name = dist['DomainName']
# Get origin (S3 bucket if applicable)
origins = dist.get('Origins', {}).get('Items', [])
s3_origins = []
for origin in origins:
domain = origin.get('DomainName', '')
if '.s3.' in domain or 's3-website' in domain:
# Extract bucket name from S3 origin
bucket_match = re.search(r'([^/]+)\.s3[\.-]', domain)
if bucket_match:
s3_origins.append(bucket_match.group(1))
distributions[domain_name] = {
'id': dist_id,
's3_buckets': s3_origins,
'aliases': dist.get('Aliases', {}).get('Items', [])
}
return distributions
except Exception as e:
click.echo(f"Warning: Could not list CloudFront distributions: {e}", err=True)
return {}
def parse_dns_csv(csv_path: str) -> Dict[str, List[Dict]]:
"""
Parse DNS records CSV and extract domains with S3/CloudFront records
Returns:
Dictionary mapping domain -> list of relevant DNS records
"""
domains = defaultdict(list)
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
domain = row.get('Domain', '').strip()
record_type = row.get('Record Type', '').strip()
details = row.get('Details', '').strip()
record = row.get('Record', '').strip()
if not domain:
continue
# Check for S3 website endpoints
if 's3-website' in details.lower() or 's3.amazonaws.com' in details.lower():
domains[domain].append({
'type': 'S3_DIRECT',
'record_type': record_type,
'record': record,
'target': details,
'domain': domain
})
# Check for CloudFront
if 'cloudfront.net' in details.lower():
domains[domain].append({
'type': 'CLOUDFRONT',
'record_type': record_type,
'record': record,
'target': details,
'domain': domain
})
# Check for ACM validation (indicates AWS usage)
if 'acm-validations.aws' in details.lower():
domains[domain].append({
'type': 'ACM_VALIDATION',
'record_type': record_type,
'record': record,
'target': details,
'domain': domain
})
return dict(domains)
def extract_s3_bucket_from_target(target: str) -> Optional[str]:
"""Extract S3 bucket name from DNS target"""
# Pattern: bucket-name.s3-website-region.amazonaws.com
# Bucket name can contain dots, so capture everything up to .s3-website-
match = re.search(r'^([^/]+)\.s3-website-([^\.]+)\.amazonaws\.com', target)
if match:
return match.group(1)
# Pattern: www.bucket-name.s3-website-region.amazonaws.com
match = re.search(r'^www\.([^/]+)\.s3-website-([^\.]+)\.amazonaws\.com', target)
if match:
return match.group(1)
# Pattern: bucket-name.s3.region.amazonaws.com
match = re.search(r'^([^/]+)\.s3\.([^\.]+)\.amazonaws\.com', target)
if match:
return match.group(1)
return None
def match_domains_to_buckets(
domains: Dict[str, List[Dict]],
buckets: List[Dict[str, str]],
cloudfront: Dict[str, Dict]
) -> List[Dict]:
"""
Match domains to S3 buckets
Returns:
List of matches with domain, bucket, match_type, etc.
"""
matches = []
bucket_names = {b['name'] for b in buckets}
# Create CloudFront domain -> bucket mapping
cf_domain_to_bucket = {}
for cf_domain, cf_info in cloudfront.items():
for alias in cf_info.get('aliases', []):
cf_domain_to_bucket[alias.lower()] = cf_info['s3_buckets']
cf_domain_to_bucket[cf_domain.lower()] = cf_info['s3_buckets']
for domain, records in domains.items():
domain_lower = domain.lower()
domain_base = domain_lower.replace('www.', '').replace('@', '')
# Check each DNS record
for record in records:
match_info = {
'domain': domain,
'dns_record': record['record'],
'dns_target': record['target'],
'match_type': None,
'bucket_name': None,
'bucket_region': None,
'confidence': 'low'
}
if record['type'] == 'S3_DIRECT':
# Extract bucket from S3 endpoint
bucket_name = extract_s3_bucket_from_target(record['target'])
if bucket_name and bucket_name in bucket_names:
match_info['bucket_name'] = bucket_name
match_info['match_type'] = 'S3_DIRECT_DNS'
match_info['confidence'] = 'high'
# Find bucket region
for bucket in buckets:
if bucket['name'] == bucket_name:
match_info['bucket_region'] = bucket['region']
break
matches.append(match_info)
elif record['type'] == 'CLOUDFRONT':
# Check if CloudFront points to S3
cf_target = record['target'].lower()
if cf_target in cf_domain_to_bucket:
s3_buckets = cf_domain_to_bucket[cf_target]
if s3_buckets:
match_info['bucket_name'] = s3_buckets[0]
match_info['match_type'] = 'CLOUDFRONT_TO_S3'
match_info['confidence'] = 'high'
# Find bucket region
for bucket in buckets:
if bucket['name'] == match_info['bucket_name']:
match_info['bucket_region'] = bucket['region']
break
matches.append(match_info)
else:
# CloudFront but no S3 origin found
match_info['match_type'] = 'CLOUDFRONT_UNKNOWN_ORIGIN'
match_info['confidence'] = 'medium'
matches.append(match_info)
# Also check if domain name matches bucket name (name-based match)
if domain_base in bucket_names:
# Check if we already have a match for this domain
existing = [m for m in matches if m['domain'] == domain and m['bucket_name'] == domain_base]
if not existing:
match_info = {
'domain': domain,
'dns_record': 'N/A',
'dns_target': 'Name match',
'match_type': 'NAME_MATCH',
'bucket_name': domain_base,
'bucket_region': None,
'confidence': 'medium'
}
# Find bucket region
for bucket in buckets:
if bucket['name'] == domain_base:
match_info['bucket_region'] = bucket['region']
break
matches.append(match_info)
return matches
@click.command()
@click.argument('dns_csv', type=click.Path(exists=True))
@click.option('--output', '-o', type=click.Path(), help='Output CSV file for matches')
def main(dns_csv: str, output: Optional[str]):
"""Match domains from DNS CSV to S3 buckets"""
click.echo("Matching domains to S3 buckets...")
click.echo("=" * 80)
# Get S3 buckets
click.echo("\n1. Fetching S3 buckets...")
buckets = get_s3_buckets()
click.echo(f" Found {len(buckets)} S3 buckets")
# Get CloudFront distributions
click.echo("\n2. Fetching CloudFront distributions...")
cloudfront = get_cloudfront_distributions()
click.echo(f" Found {len(cloudfront)} CloudFront distributions")
# Parse DNS CSV
click.echo(f"\n3. Parsing DNS records from {dns_csv}...")
domains = parse_dns_csv(dns_csv)
click.echo(f" Found {len(domains)} domains with AWS-related DNS records")
# Match domains to buckets
click.echo("\n4. Matching domains to buckets...")
matches = match_domains_to_buckets(domains, buckets, cloudfront)
click.echo(f" Found {len(matches)} matches")
# Display results
click.echo("\n" + "=" * 80)
click.echo("MATCH RESULTS")
click.echo("=" * 80)
if not matches:
click.echo("No matches found.")
return
# Group by match type
by_type = defaultdict(list)
for match in matches:
by_type[match['match_type']].append(match)
for match_type, type_matches in by_type.items():
click.echo(f"\n{match_type} ({len(type_matches)} matches):")
click.echo("-" * 80)
for match in sorted(type_matches, key=lambda x: x['domain']):
confidence_icon = "[OK]" if match['confidence'] == 'high' else "[?]"
click.echo(f" {confidence_icon} {match['domain']:<40} -> {match['bucket_name'] or 'N/A':<40} "
f"({match['bucket_region'] or 'N/A'})")
# Save to CSV if requested
if output:
click.echo(f"\n5. Saving results to {output}...")
with open(output, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'domain', 'bucket_name', 'bucket_region', 'match_type',
'confidence', 'dns_record', 'dns_target'
])
writer.writeheader()
for match in matches:
writer.writerow(match)
click.echo(f" Saved {len(matches)} matches to {output}")
click.echo("\n" + "=" * 80)
click.echo("Summary:")
click.echo(f" Total domains analyzed: {len(domains)}")
click.echo(f" Total matches found: {len(matches)}")
click.echo(f" High confidence: {len([m for m in matches if m['confidence'] == 'high'])}")
click.echo(f" Medium confidence: {len([m for m in matches if m['confidence'] == 'medium'])}")
click.echo("=" * 80)
if __name__ == "__main__":
main()