309 lines
11 KiB
Python
309 lines
11 KiB
Python
"""
|
|
Check a list of domains to see if they're AWS-related (S3 buckets, CloudFront, etc.)
|
|
|
|
Takes a list of domains and checks:
|
|
1. If domain name matches an S3 bucket name
|
|
2. DNS records pointing to CloudFront or S3
|
|
3. ACM validation records (indicates AWS usage)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import socket
|
|
import re
|
|
from typing import List, Dict, Optional, Set
|
|
from collections import defaultdict
|
|
|
|
import boto3
|
|
import click
|
|
from botocore.exceptions import ClientError, NoCredentialsError
|
|
from dotenv import load_dotenv
|
|
|
|
# Load .env file
|
|
load_dotenv()
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from src.deployment.s3_storage import map_aws_region_to_short_code
|
|
|
|
try:
|
|
import dns.resolver
|
|
DNS_AVAILABLE = True
|
|
except ImportError:
|
|
DNS_AVAILABLE = False
|
|
click.echo("Warning: dnspython not available. Install with: pip install dnspython", err=True)
|
|
click.echo("Will use basic socket resolution only.", err=True)
|
|
|
|
|
|
def get_s3_buckets() -> List[Dict[str, str]]:
|
|
"""Get all S3 buckets with their regions"""
|
|
try:
|
|
s3_client = boto3.client('s3')
|
|
response = s3_client.list_buckets()
|
|
|
|
buckets = []
|
|
for bucket in response.get('Buckets', []):
|
|
bucket_name = bucket['Name']
|
|
|
|
# Get bucket region
|
|
try:
|
|
region_response = s3_client.get_bucket_location(Bucket=bucket_name)
|
|
region = region_response.get('LocationConstraint', 'us-east-1')
|
|
if region is None or region == '':
|
|
region = 'us-east-1'
|
|
except ClientError:
|
|
region = 'us-east-1'
|
|
|
|
buckets.append({
|
|
'name': bucket_name,
|
|
'region': region
|
|
})
|
|
|
|
return buckets
|
|
|
|
except NoCredentialsError:
|
|
click.echo("Error: AWS credentials not found.", err=True)
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
click.echo(f"Error listing buckets: {e}", err=True)
|
|
sys.exit(1)
|
|
|
|
|
|
def resolve_dns(domain: str) -> Dict[str, List[str]]:
|
|
"""
|
|
Resolve DNS records for a domain
|
|
|
|
Returns:
|
|
Dictionary with 'A', 'CNAME', 'TXT' record lists
|
|
"""
|
|
records = {'A': [], 'CNAME': [], 'TXT': []}
|
|
|
|
if DNS_AVAILABLE:
|
|
try:
|
|
# Try to get CNAME records
|
|
try:
|
|
answers = dns.resolver.resolve(domain, 'CNAME')
|
|
records['CNAME'] = [str(r.target) for r in answers]
|
|
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
|
|
pass
|
|
|
|
# Try to get A records
|
|
try:
|
|
answers = dns.resolver.resolve(domain, 'A')
|
|
records['A'] = [str(r) for r in answers]
|
|
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
|
|
pass
|
|
|
|
# Try to get TXT records (for ACM validation)
|
|
try:
|
|
answers = dns.resolver.resolve(domain, 'TXT')
|
|
records['TXT'] = [str(r) for r in answers]
|
|
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
|
|
pass
|
|
except Exception as e:
|
|
click.echo(f" Warning: DNS lookup failed for {domain}: {e}", err=True)
|
|
else:
|
|
# Fallback to basic socket resolution
|
|
try:
|
|
ip = socket.gethostbyname(domain)
|
|
records['A'] = [ip]
|
|
except socket.gaierror:
|
|
pass
|
|
|
|
return records
|
|
|
|
|
|
def check_aws_indicators(domain: str, dns_records: Dict[str, List[str]], buckets: List[Dict[str, str]]) -> Dict:
|
|
"""
|
|
Check if domain shows AWS-related indicators
|
|
|
|
Returns:
|
|
Dictionary with match information
|
|
"""
|
|
bucket_names = {b['name'] for b in buckets}
|
|
domain_lower = domain.lower().replace('www.', '')
|
|
|
|
result = {
|
|
'domain': domain,
|
|
'bucket_match': None,
|
|
'bucket_region': None,
|
|
'dns_cloudfront': False,
|
|
'dns_s3': False,
|
|
'acm_validation': False,
|
|
'confidence': 'none',
|
|
'indicators': []
|
|
}
|
|
|
|
# Check if domain name matches a bucket name
|
|
if domain_lower in bucket_names:
|
|
result['bucket_match'] = domain_lower
|
|
result['confidence'] = 'high'
|
|
result['indicators'].append('Bucket name match')
|
|
# Find bucket region
|
|
for bucket in buckets:
|
|
if bucket['name'] == domain_lower:
|
|
result['bucket_region'] = bucket['region']
|
|
break
|
|
|
|
# Check DNS records for AWS indicators
|
|
all_targets = []
|
|
for record_type, values in dns_records.items():
|
|
all_targets.extend(values)
|
|
|
|
for target in all_targets:
|
|
target_lower = target.lower()
|
|
|
|
# Check for CloudFront
|
|
if 'cloudfront.net' in target_lower:
|
|
result['dns_cloudfront'] = True
|
|
result['indicators'].append(f'CloudFront: {target}')
|
|
if result['confidence'] == 'none':
|
|
result['confidence'] = 'high'
|
|
|
|
# Check for S3 website endpoints
|
|
if 's3-website' in target_lower or '.s3.' in target_lower:
|
|
result['dns_s3'] = True
|
|
result['indicators'].append(f'S3 endpoint: {target}')
|
|
if result['confidence'] == 'none':
|
|
result['confidence'] = 'high'
|
|
|
|
# Try to extract bucket name
|
|
match = re.search(r'([^/\.]+)\.s3-website-', target_lower)
|
|
if not match:
|
|
match = re.search(r'([^/\.]+)\.s3\.', target_lower)
|
|
if match:
|
|
extracted_bucket = match.group(1)
|
|
if extracted_bucket in bucket_names:
|
|
result['bucket_match'] = extracted_bucket
|
|
result['confidence'] = 'high'
|
|
|
|
# Check for ACM validation
|
|
if 'acm-validations.aws' in target_lower:
|
|
result['acm_validation'] = True
|
|
result['indicators'].append('ACM validation record')
|
|
if result['confidence'] == 'none':
|
|
result['confidence'] = 'medium'
|
|
|
|
return result
|
|
|
|
|
|
@click.command()
|
|
@click.argument('domains_file', type=click.Path(exists=True))
|
|
@click.option('--output', '-o', type=click.Path(), help='Output CSV file for results')
|
|
@click.option('--skip-dns', is_flag=True, help='Skip DNS lookups (faster, name matching only)')
|
|
def main(domains_file: str, output: Optional[str], skip_dns: bool):
|
|
"""Check domains from a file to see if they're AWS-related"""
|
|
|
|
click.echo("Checking domains for AWS indicators...")
|
|
click.echo("=" * 80)
|
|
|
|
# Read domain list
|
|
click.echo(f"\n1. Reading domains from {domains_file}...")
|
|
domains = []
|
|
with open(domains_file, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
domain = line.strip()
|
|
if domain and not domain.startswith('#'):
|
|
# Remove www. prefix if present
|
|
domain = domain.replace('www.', '').strip()
|
|
if domain:
|
|
domains.append(domain)
|
|
|
|
click.echo(f" Found {len(domains)} domains")
|
|
|
|
# Get S3 buckets
|
|
click.echo("\n2. Fetching S3 buckets...")
|
|
buckets = get_s3_buckets()
|
|
click.echo(f" Found {len(buckets)} S3 buckets")
|
|
|
|
# Check each domain
|
|
click.echo(f"\n3. Checking domains{' (DNS lookups enabled)' if not skip_dns else ' (name matching only)'}...")
|
|
results = []
|
|
|
|
for idx, domain in enumerate(domains, 1):
|
|
click.echo(f" [{idx}/{len(domains)}] Checking {domain}...", nl=False)
|
|
|
|
dns_records = {}
|
|
if not skip_dns:
|
|
dns_records = resolve_dns(domain)
|
|
# Also check www subdomain
|
|
www_domain = f"www.{domain}"
|
|
www_records = resolve_dns(www_domain)
|
|
# Merge www records
|
|
for record_type in ['A', 'CNAME', 'TXT']:
|
|
dns_records[record_type] = dns_records.get(record_type, []) + www_records.get(record_type, [])
|
|
|
|
result = check_aws_indicators(domain, dns_records, buckets)
|
|
results.append(result)
|
|
|
|
if result['confidence'] != 'none':
|
|
click.echo(f" [MATCH - {result['confidence']}]")
|
|
else:
|
|
click.echo(" [No match]")
|
|
|
|
# Display results
|
|
click.echo("\n" + "=" * 80)
|
|
click.echo("RESULTS")
|
|
click.echo("=" * 80)
|
|
|
|
# Group by confidence
|
|
high_confidence = [r for r in results if r['confidence'] == 'high']
|
|
medium_confidence = [r for r in results if r['confidence'] == 'medium']
|
|
no_match = [r for r in results if r['confidence'] == 'none']
|
|
|
|
if high_confidence:
|
|
click.echo(f"\nHIGH CONFIDENCE ({len(high_confidence)} domains):")
|
|
click.echo("-" * 80)
|
|
for result in sorted(high_confidence, key=lambda x: x['domain']):
|
|
bucket_info = f" -> {result['bucket_match']}" if result['bucket_match'] else ""
|
|
region_info = f" ({result['bucket_region']})" if result['bucket_region'] else ""
|
|
click.echo(f" [OK] {result['domain']:<40}{bucket_info}{region_info}")
|
|
if result['indicators']:
|
|
for indicator in result['indicators']:
|
|
click.echo(f" - {indicator}")
|
|
|
|
if medium_confidence:
|
|
click.echo(f"\nMEDIUM CONFIDENCE ({len(medium_confidence)} domains):")
|
|
click.echo("-" * 80)
|
|
for result in sorted(medium_confidence, key=lambda x: x['domain']):
|
|
click.echo(f" [?] {result['domain']:<40}")
|
|
if result['indicators']:
|
|
for indicator in result['indicators']:
|
|
click.echo(f" - {indicator}")
|
|
|
|
if no_match:
|
|
click.echo(f"\nNO MATCH ({len(no_match)} domains):")
|
|
click.echo("-" * 80)
|
|
for result in sorted(no_match, key=lambda x: x['domain']):
|
|
click.echo(f" [ ] {result['domain']}")
|
|
|
|
# Save to CSV if requested
|
|
if output:
|
|
click.echo(f"\n4. Saving results to {output}...")
|
|
import csv
|
|
with open(output, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=[
|
|
'domain', 'bucket_match', 'bucket_region', 'confidence',
|
|
'dns_cloudfront', 'dns_s3', 'acm_validation', 'indicators'
|
|
])
|
|
writer.writeheader()
|
|
for result in results:
|
|
row = result.copy()
|
|
row['indicators'] = '; '.join(row['indicators'])
|
|
writer.writerow(row)
|
|
click.echo(f" Saved {len(results)} results to {output}")
|
|
|
|
click.echo("\n" + "=" * 80)
|
|
click.echo("Summary:")
|
|
click.echo(f" Total domains checked: {len(domains)}")
|
|
click.echo(f" High confidence matches: {len(high_confidence)}")
|
|
click.echo(f" Medium confidence matches: {len(medium_confidence)}")
|
|
click.echo(f" No matches: {len(no_match)}")
|
|
click.echo("=" * 80)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|