Big-Link-Man/scripts/discover_s3_buckets.py

507 lines
18 KiB
Python

"""
S3 Bucket Discovery and Registration Script
Discovers all AWS S3 buckets and allows interactive selection to register them
as SiteDeployment records for use in the site assignment pool.
"""
import os
import sys
import hashlib
import logging
from typing import List, Dict, Optional
from datetime import datetime
import boto3
import click
from botocore.exceptions import ClientError, BotoCoreError, NoCredentialsError
from dotenv import load_dotenv
# Load .env file
load_dotenv()
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.database.session import db_manager
from src.database.repositories import SiteDeploymentRepository
from src.deployment.s3_storage import map_aws_region_to_short_code
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BucketInfo:
"""Information about an S3 bucket"""
def __init__(self, name: str, region: str, creation_date: Optional[datetime] = None):
self.name = name
self.region = region
self.creation_date = creation_date
self.is_registered = False
def __repr__(self):
return f"BucketInfo(name={self.name}, region={self.region})"
def get_s3_client():
"""
Create and return a boto3 S3 client
Raises:
SystemExit: If AWS credentials are not found
"""
try:
access_key = os.getenv('AWS_ACCESS_KEY_ID')
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
if not access_key or not secret_key:
click.echo("Error: AWS credentials not found.", err=True)
click.echo("Please set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables.", err=True)
sys.exit(1)
return boto3.client('s3')
except Exception as e:
click.echo(f"Error creating S3 client: {e}", err=True)
sys.exit(1)
def list_all_buckets(s3_client) -> List[BucketInfo]:
"""
List all S3 buckets and retrieve their metadata
Args:
s3_client: boto3 S3 client
Returns:
List of BucketInfo objects
Raises:
SystemExit: If unable to list buckets
"""
try:
response = s3_client.list_buckets()
buckets = []
for bucket in response.get('Buckets', []):
bucket_name = bucket['Name']
creation_date = bucket.get('CreationDate')
# Get bucket region
try:
region_response = s3_client.get_bucket_location(Bucket=bucket_name)
region = region_response.get('LocationConstraint', 'us-east-1')
# AWS returns None for us-east-1, so normalize it
if region is None or region == '':
region = 'us-east-1'
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code == 'AccessDenied':
logger.warning(f"Access denied to get region for bucket {bucket_name}, using default")
region = 'us-east-1'
else:
logger.warning(f"Could not get region for bucket {bucket_name}: {e}, using default")
region = 'us-east-1'
buckets.append(BucketInfo(
name=bucket_name,
region=region,
creation_date=creation_date
))
return buckets
except NoCredentialsError:
click.echo("Error: AWS credentials not found or invalid.", err=True)
click.echo("Please configure AWS credentials using:", err=True)
click.echo(" - Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY", err=True)
click.echo(" - AWS credentials file: ~/.aws/credentials", err=True)
click.echo(" - IAM role (if running on EC2)", err=True)
sys.exit(1)
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
error_message = e.response.get('Error', {}).get('Message', str(e))
click.echo(f"Error listing buckets: {error_code} - {error_message}", err=True)
if error_code == 'AccessDenied':
click.echo("Insufficient permissions. Ensure your AWS credentials have s3:ListAllMyBuckets permission.", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Unexpected error listing buckets: {e}", err=True)
sys.exit(1)
def load_excluded_buckets(exclusion_file: str = "s3_bucket_exclusions.txt") -> set:
"""
Load excluded bucket names from a text file
Args:
exclusion_file: Path to exclusion file (relative to project root)
Returns:
Set of bucket names to exclude
"""
excluded = set()
exclusion_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
exclusion_file
)
if not os.path.exists(exclusion_path):
logger.debug(f"Exclusion file not found: {exclusion_path}, skipping exclusions")
return excluded
try:
with open(exclusion_path, 'r', encoding='utf-8') as f:
for line in f:
# Strip whitespace and skip comments/empty lines
line = line.strip()
if line and not line.startswith('#'):
excluded.add(line)
if excluded:
logger.info(f"Loaded {len(excluded)} excluded bucket(s) from {exclusion_file}")
except Exception as e:
logger.warning(f"Error loading exclusion file {exclusion_path}: {e}")
return excluded
def check_existing_deployments(site_repo: SiteDeploymentRepository, bucket_names: List[str]) -> Dict[str, bool]:
"""
Check which buckets are already registered in the database
Args:
site_repo: SiteDeploymentRepository instance
bucket_names: List of bucket names to check
Returns:
Dictionary mapping bucket names to boolean (True if registered)
"""
existing = {}
all_sites = site_repo.get_all()
registered_buckets = {
site.s3_bucket_name
for site in all_sites
if site.s3_bucket_name and site.storage_provider in ('s3', 's3_compatible')
}
for bucket_name in bucket_names:
existing[bucket_name] = bucket_name in registered_buckets
return existing
def generate_unique_hostname(bucket_name: str, site_repo: SiteDeploymentRepository, attempt: int = 0) -> str:
"""
Generate a unique hostname for the pull_zone_bcdn_hostname field
Args:
bucket_name: S3 bucket name
site_repo: SiteDeploymentRepository to check for existing hostnames
attempt: Retry attempt number (for appending suffix)
Returns:
Unique hostname string
"""
if attempt == 0:
base_hostname = f"s3-{bucket_name}.b-cdn.net"
else:
base_hostname = f"s3-{bucket_name}-{attempt}.b-cdn.net"
# Check if hostname already exists
existing = site_repo.get_by_bcdn_hostname(base_hostname)
if existing is None:
return base_hostname
# Try again with incremented suffix
return generate_unique_hostname(bucket_name, site_repo, attempt + 1)
def generate_bucket_hash(bucket_name: str) -> int:
"""
Generate a numeric hash from bucket name for placeholder IDs
Args:
bucket_name: S3 bucket name
Returns:
Integer hash (positive, within reasonable range)
"""
hash_obj = hashlib.md5(bucket_name.encode())
hash_int = int(hash_obj.hexdigest(), 16)
# Take modulo to keep it reasonable, but ensure it's positive
return abs(hash_int % 1000000)
def register_bucket(
bucket_info: BucketInfo,
site_repo: SiteDeploymentRepository,
site_name: Optional[str] = None,
custom_domain: Optional[str] = None
) -> bool:
"""
Register an S3 bucket as a SiteDeployment record
Args:
bucket_info: BucketInfo object with bucket details
site_repo: SiteDeploymentRepository instance
site_name: Optional site name (defaults to bucket name)
custom_domain: Optional custom domain for S3
Returns:
True if successful, False otherwise
"""
bucket_name = bucket_info.name
bucket_region = bucket_info.region
# Check if already registered
all_sites = site_repo.get_all()
for site in all_sites:
if site.s3_bucket_name == bucket_name and site.storage_provider == 's3':
click.echo(f" [SKIP] Bucket '{bucket_name}' is already registered (site_id={site.id})")
return False
# Generate placeholder values for Bunny.net fields
bucket_hash = generate_bucket_hash(bucket_name)
short_region = map_aws_region_to_short_code(bucket_region)
unique_hostname = generate_unique_hostname(bucket_name, site_repo)
# Use provided site_name or default to bucket name
final_site_name = site_name or bucket_name
try:
deployment = site_repo.create(
site_name=final_site_name,
storage_provider='s3',
storage_zone_id=bucket_hash,
storage_zone_name=f"s3-{bucket_name}",
storage_zone_password="s3-placeholder",
storage_zone_region=short_region,
pull_zone_id=bucket_hash,
pull_zone_bcdn_hostname=unique_hostname,
custom_hostname=None,
s3_bucket_name=bucket_name,
s3_bucket_region=bucket_region,
s3_custom_domain=custom_domain,
s3_endpoint_url=None
)
click.echo(f" [OK] Registered bucket '{bucket_name}' as site_id={deployment.id}")
return True
except ValueError as e:
click.echo(f" [ERROR] Failed to register bucket '{bucket_name}': {e}", err=True)
return False
except Exception as e:
click.echo(f" [ERROR] Unexpected error registering bucket '{bucket_name}': {e}", err=True)
return False
def display_buckets(buckets: List[BucketInfo], existing_map: Dict[str, bool], excluded_buckets: set = None):
"""
Display buckets in a formatted table
Args:
buckets: List of BucketInfo objects
existing_map: Dictionary mapping bucket names to registration status
excluded_buckets: Set of excluded bucket names (optional)
"""
if excluded_buckets is None:
excluded_buckets = set()
click.echo("\n" + "=" * 80)
click.echo("Available S3 Buckets")
click.echo("=" * 80)
click.echo(f"{'#':<4} {'Bucket Name':<40} {'Region':<15} {'Status':<15}")
click.echo("-" * 80)
for idx, bucket in enumerate(buckets, 1):
bucket.is_registered = existing_map.get(bucket.name, False)
if bucket.name in excluded_buckets:
status = "[EXCLUDED]"
elif bucket.is_registered:
status = "[REGISTERED]"
else:
status = "[AVAILABLE]"
click.echo(f"{idx:<4} {bucket.name:<40} {bucket.region:<15} {status:<15}")
click.echo("=" * 80)
@click.command()
@click.option('--auto-import-all', is_flag=True, default=False,
help='Automatically import all unregistered buckets as bucket-only sites (no custom domain)')
def main(auto_import_all: bool):
"""Main entry point for the discovery script"""
click.echo("S3 Bucket Discovery and Registration")
click.echo("=" * 80)
# Initialize database
try:
db_manager.initialize()
except Exception as e:
click.echo(f"Error initializing database: {e}", err=True)
sys.exit(1)
session = db_manager.get_session()
site_repo = SiteDeploymentRepository(session)
try:
# Get S3 client
click.echo("\nConnecting to AWS S3...")
s3_client = get_s3_client()
# List all buckets
click.echo("Discovering S3 buckets...")
buckets = list_all_buckets(s3_client)
if not buckets:
click.echo("No S3 buckets found in your AWS account.")
return
# Load excluded buckets
excluded_buckets = load_excluded_buckets()
# Check which buckets are already registered
bucket_names = [b.name for b in buckets]
existing_map = check_existing_deployments(site_repo, bucket_names)
# Mark excluded buckets in existing_map
for bucket_name in excluded_buckets:
if bucket_name in existing_map:
existing_map[bucket_name] = True # Treat excluded as "registered" to skip
# Display buckets
display_buckets(buckets, existing_map, excluded_buckets)
# Filter out already registered buckets and excluded buckets
available_buckets = [
b for b in buckets
if not existing_map.get(b.name, False) and b.name not in excluded_buckets
]
if excluded_buckets:
excluded_count = sum(1 for b in buckets if b.name in excluded_buckets)
if excluded_count > 0:
click.echo(f"\nNote: {excluded_count} bucket(s) excluded by exclusion list")
if not available_buckets:
click.echo("\nAll buckets are already registered.")
return
# Auto-import mode: register all available buckets as bucket-only sites
if auto_import_all:
click.echo(f"\nAuto-import mode: Registering {len(available_buckets)} unregistered bucket(s) as bucket-only sites...")
success_count = 0
error_count = 0
for bucket_info in available_buckets:
# Register as bucket-only (no custom domain, site_name = bucket_name)
if register_bucket(bucket_info, site_repo, site_name=None, custom_domain=None):
success_count += 1
else:
error_count += 1
click.echo(f"\n{'=' * 80}")
click.echo(f"Auto-import complete: {success_count} bucket(s) registered, {error_count} failed.")
click.echo("=" * 80)
return
# Interactive mode: prompt for bucket selection
click.echo(f"\nFound {len(available_buckets)} available bucket(s) to register.")
click.echo("Enter bucket numbers to register (comma-separated, e.g., 1,3,5):")
click.echo("Or press Enter to skip registration.")
try:
selection_input = click.prompt("Selection", default="", type=str).strip()
except click.Abort:
click.echo("\nOperation cancelled.")
return
if not selection_input:
click.echo("No buckets selected. Exiting.")
return
# Parse selection
try:
selected_indices = [int(x.strip()) - 1 for x in selection_input.split(',')]
except ValueError:
click.echo("Error: Invalid selection format. Use comma-separated numbers (e.g., 1,3,5)", err=True)
return
# Validate indices
valid_selections = []
for idx in selected_indices:
if 0 <= idx < len(buckets):
if buckets[idx].name in [b.name for b in available_buckets]:
valid_selections.append(buckets[idx])
else:
click.echo(f"Warning: Bucket #{idx + 1} is already registered, skipping.", err=True)
else:
click.echo(f"Warning: Invalid bucket number {idx + 1}, skipping.", err=True)
if not valid_selections:
click.echo("No valid buckets selected.")
return
# Register selected buckets
click.echo(f"\nRegistering {len(valid_selections)} bucket(s)...")
success_count = 0
for bucket_info in valid_selections:
click.echo(f"\nRegistering bucket: {bucket_info.name}")
# Prompt for site name
default_site_name = bucket_info.name
try:
site_name = click.prompt("Site name", default=default_site_name, type=str).strip()
except click.Abort:
click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'")
continue
if not site_name:
site_name = default_site_name
# Prompt for custom domain (optional)
try:
custom_domain = click.prompt(
"Custom domain (optional, press Enter to skip)",
default="",
type=str
).strip()
except click.Abort:
click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'")
continue
if not custom_domain:
custom_domain = None
# Confirm registration
try:
if click.confirm(f"Register '{bucket_info.name}' as '{site_name}'?"):
if register_bucket(bucket_info, site_repo, site_name, custom_domain):
success_count += 1
else:
click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'")
except click.Abort:
click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'")
click.echo(f"\n{'=' * 80}")
click.echo(f"Registration complete: {success_count}/{len(valid_selections)} bucket(s) registered.")
click.echo("=" * 80)
except KeyboardInterrupt:
click.echo("\n\nOperation cancelled by user.")
sys.exit(0)
except Exception as e:
click.echo(f"\nUnexpected error: {e}", err=True)
logger.exception("Unexpected error in bucket discovery")
sys.exit(1)
finally:
session.close()
if __name__ == "__main__":
main()