345 lines
13 KiB
Python
345 lines
13 KiB
Python
"""
|
|
Match domains from registrar DNS exports to S3 buckets
|
|
|
|
Analyzes DNS records to find domains pointing to:
|
|
- Direct S3 website endpoints (*.s3-website-*.amazonaws.com)
|
|
- CloudFront distributions (which may front S3 buckets)
|
|
- S3 bucket names that match domain names
|
|
"""
|
|
|
|
import csv
|
|
import re
|
|
import sys
|
|
import os
|
|
from typing import Dict, List, Set, Optional, Tuple
|
|
from collections import defaultdict
|
|
|
|
import boto3
|
|
import click
|
|
from botocore.exceptions import ClientError, NoCredentialsError
|
|
from dotenv import load_dotenv
|
|
|
|
# Load .env file
|
|
load_dotenv()
|
|
|
|
|
|
def get_s3_buckets() -> List[Dict[str, str]]:
|
|
"""Get all S3 buckets with their regions"""
|
|
try:
|
|
s3_client = boto3.client('s3')
|
|
response = s3_client.list_buckets()
|
|
|
|
buckets = []
|
|
for bucket in response.get('Buckets', []):
|
|
bucket_name = bucket['Name']
|
|
|
|
# Get bucket region
|
|
try:
|
|
region_response = s3_client.get_bucket_location(Bucket=bucket_name)
|
|
region = region_response.get('LocationConstraint', 'us-east-1')
|
|
if region is None or region == '':
|
|
region = 'us-east-1'
|
|
except ClientError:
|
|
region = 'us-east-1'
|
|
|
|
buckets.append({
|
|
'name': bucket_name,
|
|
'region': region
|
|
})
|
|
|
|
return buckets
|
|
|
|
except NoCredentialsError:
|
|
click.echo("Warning: AWS credentials not found. Cannot list S3 buckets.", err=True)
|
|
click.echo("Will only match based on DNS records and name matching.", err=True)
|
|
return []
|
|
except Exception as e:
|
|
click.echo(f"Warning: Error listing buckets: {e}", err=True)
|
|
click.echo("Will only match based on DNS records and name matching.", err=True)
|
|
return []
|
|
|
|
|
|
def get_cloudfront_distributions() -> Dict[str, Dict]:
|
|
"""Get all CloudFront distributions and their origins"""
|
|
try:
|
|
cf_client = boto3.client('cloudfront')
|
|
distributions = {}
|
|
|
|
paginator = cf_client.get_paginator('list_distributions')
|
|
for page in paginator.paginate():
|
|
for dist in page.get('DistributionList', {}).get('Items', []):
|
|
dist_id = dist['Id']
|
|
domain_name = dist['DomainName']
|
|
|
|
# Get origin (S3 bucket if applicable)
|
|
origins = dist.get('Origins', {}).get('Items', [])
|
|
s3_origins = []
|
|
for origin in origins:
|
|
domain = origin.get('DomainName', '')
|
|
if '.s3.' in domain or 's3-website' in domain:
|
|
# Extract bucket name from S3 origin
|
|
bucket_match = re.search(r'([^/]+)\.s3[\.-]', domain)
|
|
if bucket_match:
|
|
s3_origins.append(bucket_match.group(1))
|
|
|
|
distributions[domain_name] = {
|
|
'id': dist_id,
|
|
's3_buckets': s3_origins,
|
|
'aliases': dist.get('Aliases', {}).get('Items', [])
|
|
}
|
|
|
|
return distributions
|
|
|
|
except Exception as e:
|
|
click.echo(f"Warning: Could not list CloudFront distributions: {e}", err=True)
|
|
return {}
|
|
|
|
|
|
def parse_dns_csv(csv_path: str) -> Dict[str, List[Dict]]:
|
|
"""
|
|
Parse DNS records CSV and extract domains with S3/CloudFront records
|
|
|
|
Returns:
|
|
Dictionary mapping domain -> list of relevant DNS records
|
|
"""
|
|
domains = defaultdict(list)
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
|
|
for row in reader:
|
|
domain = row.get('Domain', '').strip()
|
|
record_type = row.get('Record Type', '').strip()
|
|
details = row.get('Details', '').strip()
|
|
record = row.get('Record', '').strip()
|
|
|
|
if not domain:
|
|
continue
|
|
|
|
# Check for S3 website endpoints
|
|
if 's3-website' in details.lower() or 's3.amazonaws.com' in details.lower():
|
|
domains[domain].append({
|
|
'type': 'S3_DIRECT',
|
|
'record_type': record_type,
|
|
'record': record,
|
|
'target': details,
|
|
'domain': domain
|
|
})
|
|
|
|
# Check for CloudFront
|
|
if 'cloudfront.net' in details.lower():
|
|
domains[domain].append({
|
|
'type': 'CLOUDFRONT',
|
|
'record_type': record_type,
|
|
'record': record,
|
|
'target': details,
|
|
'domain': domain
|
|
})
|
|
|
|
# Check for ACM validation (indicates AWS usage)
|
|
if 'acm-validations.aws' in details.lower():
|
|
domains[domain].append({
|
|
'type': 'ACM_VALIDATION',
|
|
'record_type': record_type,
|
|
'record': record,
|
|
'target': details,
|
|
'domain': domain
|
|
})
|
|
|
|
return dict(domains)
|
|
|
|
|
|
def extract_s3_bucket_from_target(target: str) -> Optional[str]:
|
|
"""Extract S3 bucket name from DNS target"""
|
|
# Pattern: bucket-name.s3-website-region.amazonaws.com
|
|
# Bucket name can contain dots, so capture everything up to .s3-website-
|
|
match = re.search(r'^([^/]+)\.s3-website-([^\.]+)\.amazonaws\.com', target)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
# Pattern: www.bucket-name.s3-website-region.amazonaws.com
|
|
match = re.search(r'^www\.([^/]+)\.s3-website-([^\.]+)\.amazonaws\.com', target)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
# Pattern: bucket-name.s3.region.amazonaws.com
|
|
match = re.search(r'^([^/]+)\.s3\.([^\.]+)\.amazonaws\.com', target)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return None
|
|
|
|
|
|
def match_domains_to_buckets(
|
|
domains: Dict[str, List[Dict]],
|
|
buckets: List[Dict[str, str]],
|
|
cloudfront: Dict[str, Dict]
|
|
) -> List[Dict]:
|
|
"""
|
|
Match domains to S3 buckets
|
|
|
|
Returns:
|
|
List of matches with domain, bucket, match_type, etc.
|
|
"""
|
|
matches = []
|
|
bucket_names = {b['name'] for b in buckets}
|
|
|
|
# Create CloudFront domain -> bucket mapping
|
|
cf_domain_to_bucket = {}
|
|
for cf_domain, cf_info in cloudfront.items():
|
|
for alias in cf_info.get('aliases', []):
|
|
cf_domain_to_bucket[alias.lower()] = cf_info['s3_buckets']
|
|
cf_domain_to_bucket[cf_domain.lower()] = cf_info['s3_buckets']
|
|
|
|
for domain, records in domains.items():
|
|
domain_lower = domain.lower()
|
|
domain_base = domain_lower.replace('www.', '').replace('@', '')
|
|
|
|
# Check each DNS record
|
|
for record in records:
|
|
match_info = {
|
|
'domain': domain,
|
|
'dns_record': record['record'],
|
|
'dns_target': record['target'],
|
|
'match_type': None,
|
|
'bucket_name': None,
|
|
'bucket_region': None,
|
|
'confidence': 'low'
|
|
}
|
|
|
|
if record['type'] == 'S3_DIRECT':
|
|
# Extract bucket from S3 endpoint
|
|
bucket_name = extract_s3_bucket_from_target(record['target'])
|
|
if bucket_name and bucket_name in bucket_names:
|
|
match_info['bucket_name'] = bucket_name
|
|
match_info['match_type'] = 'S3_DIRECT_DNS'
|
|
match_info['confidence'] = 'high'
|
|
# Find bucket region
|
|
for bucket in buckets:
|
|
if bucket['name'] == bucket_name:
|
|
match_info['bucket_region'] = bucket['region']
|
|
break
|
|
matches.append(match_info)
|
|
|
|
elif record['type'] == 'CLOUDFRONT':
|
|
# Check if CloudFront points to S3
|
|
cf_target = record['target'].lower()
|
|
if cf_target in cf_domain_to_bucket:
|
|
s3_buckets = cf_domain_to_bucket[cf_target]
|
|
if s3_buckets:
|
|
match_info['bucket_name'] = s3_buckets[0]
|
|
match_info['match_type'] = 'CLOUDFRONT_TO_S3'
|
|
match_info['confidence'] = 'high'
|
|
# Find bucket region
|
|
for bucket in buckets:
|
|
if bucket['name'] == match_info['bucket_name']:
|
|
match_info['bucket_region'] = bucket['region']
|
|
break
|
|
matches.append(match_info)
|
|
else:
|
|
# CloudFront but no S3 origin found
|
|
match_info['match_type'] = 'CLOUDFRONT_UNKNOWN_ORIGIN'
|
|
match_info['confidence'] = 'medium'
|
|
matches.append(match_info)
|
|
|
|
# Also check if domain name matches bucket name (name-based match)
|
|
if domain_base in bucket_names:
|
|
# Check if we already have a match for this domain
|
|
existing = [m for m in matches if m['domain'] == domain and m['bucket_name'] == domain_base]
|
|
if not existing:
|
|
match_info = {
|
|
'domain': domain,
|
|
'dns_record': 'N/A',
|
|
'dns_target': 'Name match',
|
|
'match_type': 'NAME_MATCH',
|
|
'bucket_name': domain_base,
|
|
'bucket_region': None,
|
|
'confidence': 'medium'
|
|
}
|
|
# Find bucket region
|
|
for bucket in buckets:
|
|
if bucket['name'] == domain_base:
|
|
match_info['bucket_region'] = bucket['region']
|
|
break
|
|
matches.append(match_info)
|
|
|
|
return matches
|
|
|
|
|
|
@click.command()
|
|
@click.argument('dns_csv', type=click.Path(exists=True))
|
|
@click.option('--output', '-o', type=click.Path(), help='Output CSV file for matches')
|
|
def main(dns_csv: str, output: Optional[str]):
|
|
"""Match domains from DNS CSV to S3 buckets"""
|
|
|
|
click.echo("Matching domains to S3 buckets...")
|
|
click.echo("=" * 80)
|
|
|
|
# Get S3 buckets
|
|
click.echo("\n1. Fetching S3 buckets...")
|
|
buckets = get_s3_buckets()
|
|
click.echo(f" Found {len(buckets)} S3 buckets")
|
|
|
|
# Get CloudFront distributions
|
|
click.echo("\n2. Fetching CloudFront distributions...")
|
|
cloudfront = get_cloudfront_distributions()
|
|
click.echo(f" Found {len(cloudfront)} CloudFront distributions")
|
|
|
|
# Parse DNS CSV
|
|
click.echo(f"\n3. Parsing DNS records from {dns_csv}...")
|
|
domains = parse_dns_csv(dns_csv)
|
|
click.echo(f" Found {len(domains)} domains with AWS-related DNS records")
|
|
|
|
# Match domains to buckets
|
|
click.echo("\n4. Matching domains to buckets...")
|
|
matches = match_domains_to_buckets(domains, buckets, cloudfront)
|
|
click.echo(f" Found {len(matches)} matches")
|
|
|
|
# Display results
|
|
click.echo("\n" + "=" * 80)
|
|
click.echo("MATCH RESULTS")
|
|
click.echo("=" * 80)
|
|
|
|
if not matches:
|
|
click.echo("No matches found.")
|
|
return
|
|
|
|
# Group by match type
|
|
by_type = defaultdict(list)
|
|
for match in matches:
|
|
by_type[match['match_type']].append(match)
|
|
|
|
for match_type, type_matches in by_type.items():
|
|
click.echo(f"\n{match_type} ({len(type_matches)} matches):")
|
|
click.echo("-" * 80)
|
|
for match in sorted(type_matches, key=lambda x: x['domain']):
|
|
confidence_icon = "[OK]" if match['confidence'] == 'high' else "[?]"
|
|
click.echo(f" {confidence_icon} {match['domain']:<40} -> {match['bucket_name'] or 'N/A':<40} "
|
|
f"({match['bucket_region'] or 'N/A'})")
|
|
|
|
# Save to CSV if requested
|
|
if output:
|
|
click.echo(f"\n5. Saving results to {output}...")
|
|
with open(output, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=[
|
|
'domain', 'bucket_name', 'bucket_region', 'match_type',
|
|
'confidence', 'dns_record', 'dns_target'
|
|
])
|
|
writer.writeheader()
|
|
for match in matches:
|
|
writer.writerow(match)
|
|
click.echo(f" Saved {len(matches)} matches to {output}")
|
|
|
|
click.echo("\n" + "=" * 80)
|
|
click.echo("Summary:")
|
|
click.echo(f" Total domains analyzed: {len(domains)}")
|
|
click.echo(f" Total matches found: {len(matches)}")
|
|
click.echo(f" High confidence: {len([m for m in matches if m['confidence'] == 'high'])}")
|
|
click.echo(f" Medium confidence: {len([m for m in matches if m['confidence'] == 'medium'])}")
|
|
click.echo("=" * 80)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|