Big-Link-Man/scripts/check_domains_for_aws.py

309 lines
11 KiB
Python

"""
Check a list of domains to see if they're AWS-related (S3 buckets, CloudFront, etc.)
Takes a list of domains and checks:
1. If domain name matches an S3 bucket name
2. DNS records pointing to CloudFront or S3
3. ACM validation records (indicates AWS usage)
"""
import os
import sys
import socket
import re
from typing import List, Dict, Optional, Set
from collections import defaultdict
import boto3
import click
from botocore.exceptions import ClientError, NoCredentialsError
from dotenv import load_dotenv
# Load .env file
load_dotenv()
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.deployment.s3_storage import map_aws_region_to_short_code
try:
import dns.resolver
DNS_AVAILABLE = True
except ImportError:
DNS_AVAILABLE = False
click.echo("Warning: dnspython not available. Install with: pip install dnspython", err=True)
click.echo("Will use basic socket resolution only.", err=True)
def get_s3_buckets() -> List[Dict[str, str]]:
"""Get all S3 buckets with their regions"""
try:
s3_client = boto3.client('s3')
response = s3_client.list_buckets()
buckets = []
for bucket in response.get('Buckets', []):
bucket_name = bucket['Name']
# Get bucket region
try:
region_response = s3_client.get_bucket_location(Bucket=bucket_name)
region = region_response.get('LocationConstraint', 'us-east-1')
if region is None or region == '':
region = 'us-east-1'
except ClientError:
region = 'us-east-1'
buckets.append({
'name': bucket_name,
'region': region
})
return buckets
except NoCredentialsError:
click.echo("Error: AWS credentials not found.", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error listing buckets: {e}", err=True)
sys.exit(1)
def resolve_dns(domain: str) -> Dict[str, List[str]]:
"""
Resolve DNS records for a domain
Returns:
Dictionary with 'A', 'CNAME', 'TXT' record lists
"""
records = {'A': [], 'CNAME': [], 'TXT': []}
if DNS_AVAILABLE:
try:
# Try to get CNAME records
try:
answers = dns.resolver.resolve(domain, 'CNAME')
records['CNAME'] = [str(r.target) for r in answers]
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
# Try to get A records
try:
answers = dns.resolver.resolve(domain, 'A')
records['A'] = [str(r) for r in answers]
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
# Try to get TXT records (for ACM validation)
try:
answers = dns.resolver.resolve(domain, 'TXT')
records['TXT'] = [str(r) for r in answers]
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
except Exception as e:
click.echo(f" Warning: DNS lookup failed for {domain}: {e}", err=True)
else:
# Fallback to basic socket resolution
try:
ip = socket.gethostbyname(domain)
records['A'] = [ip]
except socket.gaierror:
pass
return records
def check_aws_indicators(domain: str, dns_records: Dict[str, List[str]], buckets: List[Dict[str, str]]) -> Dict:
"""
Check if domain shows AWS-related indicators
Returns:
Dictionary with match information
"""
bucket_names = {b['name'] for b in buckets}
domain_lower = domain.lower().replace('www.', '')
result = {
'domain': domain,
'bucket_match': None,
'bucket_region': None,
'dns_cloudfront': False,
'dns_s3': False,
'acm_validation': False,
'confidence': 'none',
'indicators': []
}
# Check if domain name matches a bucket name
if domain_lower in bucket_names:
result['bucket_match'] = domain_lower
result['confidence'] = 'high'
result['indicators'].append('Bucket name match')
# Find bucket region
for bucket in buckets:
if bucket['name'] == domain_lower:
result['bucket_region'] = bucket['region']
break
# Check DNS records for AWS indicators
all_targets = []
for record_type, values in dns_records.items():
all_targets.extend(values)
for target in all_targets:
target_lower = target.lower()
# Check for CloudFront
if 'cloudfront.net' in target_lower:
result['dns_cloudfront'] = True
result['indicators'].append(f'CloudFront: {target}')
if result['confidence'] == 'none':
result['confidence'] = 'high'
# Check for S3 website endpoints
if 's3-website' in target_lower or '.s3.' in target_lower:
result['dns_s3'] = True
result['indicators'].append(f'S3 endpoint: {target}')
if result['confidence'] == 'none':
result['confidence'] = 'high'
# Try to extract bucket name
match = re.search(r'([^/\.]+)\.s3-website-', target_lower)
if not match:
match = re.search(r'([^/\.]+)\.s3\.', target_lower)
if match:
extracted_bucket = match.group(1)
if extracted_bucket in bucket_names:
result['bucket_match'] = extracted_bucket
result['confidence'] = 'high'
# Check for ACM validation
if 'acm-validations.aws' in target_lower:
result['acm_validation'] = True
result['indicators'].append('ACM validation record')
if result['confidence'] == 'none':
result['confidence'] = 'medium'
return result
@click.command()
@click.argument('domains_file', type=click.Path(exists=True))
@click.option('--output', '-o', type=click.Path(), help='Output CSV file for results')
@click.option('--skip-dns', is_flag=True, help='Skip DNS lookups (faster, name matching only)')
def main(domains_file: str, output: Optional[str], skip_dns: bool):
"""Check domains from a file to see if they're AWS-related"""
click.echo("Checking domains for AWS indicators...")
click.echo("=" * 80)
# Read domain list
click.echo(f"\n1. Reading domains from {domains_file}...")
domains = []
with open(domains_file, 'r', encoding='utf-8') as f:
for line in f:
domain = line.strip()
if domain and not domain.startswith('#'):
# Remove www. prefix if present
domain = domain.replace('www.', '').strip()
if domain:
domains.append(domain)
click.echo(f" Found {len(domains)} domains")
# Get S3 buckets
click.echo("\n2. Fetching S3 buckets...")
buckets = get_s3_buckets()
click.echo(f" Found {len(buckets)} S3 buckets")
# Check each domain
click.echo(f"\n3. Checking domains{' (DNS lookups enabled)' if not skip_dns else ' (name matching only)'}...")
results = []
for idx, domain in enumerate(domains, 1):
click.echo(f" [{idx}/{len(domains)}] Checking {domain}...", nl=False)
dns_records = {}
if not skip_dns:
dns_records = resolve_dns(domain)
# Also check www subdomain
www_domain = f"www.{domain}"
www_records = resolve_dns(www_domain)
# Merge www records
for record_type in ['A', 'CNAME', 'TXT']:
dns_records[record_type] = dns_records.get(record_type, []) + www_records.get(record_type, [])
result = check_aws_indicators(domain, dns_records, buckets)
results.append(result)
if result['confidence'] != 'none':
click.echo(f" [MATCH - {result['confidence']}]")
else:
click.echo(" [No match]")
# Display results
click.echo("\n" + "=" * 80)
click.echo("RESULTS")
click.echo("=" * 80)
# Group by confidence
high_confidence = [r for r in results if r['confidence'] == 'high']
medium_confidence = [r for r in results if r['confidence'] == 'medium']
no_match = [r for r in results if r['confidence'] == 'none']
if high_confidence:
click.echo(f"\nHIGH CONFIDENCE ({len(high_confidence)} domains):")
click.echo("-" * 80)
for result in sorted(high_confidence, key=lambda x: x['domain']):
bucket_info = f" -> {result['bucket_match']}" if result['bucket_match'] else ""
region_info = f" ({result['bucket_region']})" if result['bucket_region'] else ""
click.echo(f" [OK] {result['domain']:<40}{bucket_info}{region_info}")
if result['indicators']:
for indicator in result['indicators']:
click.echo(f" - {indicator}")
if medium_confidence:
click.echo(f"\nMEDIUM CONFIDENCE ({len(medium_confidence)} domains):")
click.echo("-" * 80)
for result in sorted(medium_confidence, key=lambda x: x['domain']):
click.echo(f" [?] {result['domain']:<40}")
if result['indicators']:
for indicator in result['indicators']:
click.echo(f" - {indicator}")
if no_match:
click.echo(f"\nNO MATCH ({len(no_match)} domains):")
click.echo("-" * 80)
for result in sorted(no_match, key=lambda x: x['domain']):
click.echo(f" [ ] {result['domain']}")
# Save to CSV if requested
if output:
click.echo(f"\n4. Saving results to {output}...")
import csv
with open(output, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'domain', 'bucket_match', 'bucket_region', 'confidence',
'dns_cloudfront', 'dns_s3', 'acm_validation', 'indicators'
])
writer.writeheader()
for result in results:
row = result.copy()
row['indicators'] = '; '.join(row['indicators'])
writer.writerow(row)
click.echo(f" Saved {len(results)} results to {output}")
click.echo("\n" + "=" * 80)
click.echo("Summary:")
click.echo(f" Total domains checked: {len(domains)}")
click.echo(f" High confidence matches: {len(high_confidence)}")
click.echo(f" Medium confidence matches: {len(medium_confidence)}")
click.echo(f" No matches: {len(no_match)}")
click.echo("=" * 80)
if __name__ == "__main__":
main()