""" S3 Bucket Discovery and Registration Script Discovers all AWS S3 buckets and allows interactive selection to register them as SiteDeployment records for use in the site assignment pool. """ import os import sys import hashlib import logging from typing import List, Dict, Optional from datetime import datetime import boto3 import click from botocore.exceptions import ClientError, BotoCoreError, NoCredentialsError from dotenv import load_dotenv # Load .env file load_dotenv() # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.database.session import db_manager from src.database.repositories import SiteDeploymentRepository from src.deployment.s3_storage import map_aws_region_to_short_code logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BucketInfo: """Information about an S3 bucket""" def __init__(self, name: str, region: str, creation_date: Optional[datetime] = None): self.name = name self.region = region self.creation_date = creation_date self.is_registered = False def __repr__(self): return f"BucketInfo(name={self.name}, region={self.region})" def get_s3_client(): """ Create and return a boto3 S3 client Raises: SystemExit: If AWS credentials are not found """ try: access_key = os.getenv('AWS_ACCESS_KEY_ID') secret_key = os.getenv('AWS_SECRET_ACCESS_KEY') if not access_key or not secret_key: click.echo("Error: AWS credentials not found.", err=True) click.echo("Please set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables.", err=True) sys.exit(1) return boto3.client('s3') except Exception as e: click.echo(f"Error creating S3 client: {e}", err=True) sys.exit(1) def list_all_buckets(s3_client) -> List[BucketInfo]: """ List all S3 buckets and retrieve their metadata Args: s3_client: boto3 S3 client Returns: List of BucketInfo objects Raises: SystemExit: If unable to list buckets """ try: response = s3_client.list_buckets() buckets = [] for bucket in response.get('Buckets', []): bucket_name = bucket['Name'] creation_date = bucket.get('CreationDate') # Get bucket region try: region_response = s3_client.get_bucket_location(Bucket=bucket_name) region = region_response.get('LocationConstraint', 'us-east-1') # AWS returns None for us-east-1, so normalize it if region is None or region == '': region = 'us-east-1' except ClientError as e: error_code = e.response.get('Error', {}).get('Code', '') if error_code == 'AccessDenied': logger.warning(f"Access denied to get region for bucket {bucket_name}, using default") region = 'us-east-1' else: logger.warning(f"Could not get region for bucket {bucket_name}: {e}, using default") region = 'us-east-1' buckets.append(BucketInfo( name=bucket_name, region=region, creation_date=creation_date )) return buckets except NoCredentialsError: click.echo("Error: AWS credentials not found or invalid.", err=True) click.echo("Please configure AWS credentials using:", err=True) click.echo(" - Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY", err=True) click.echo(" - AWS credentials file: ~/.aws/credentials", err=True) click.echo(" - IAM role (if running on EC2)", err=True) sys.exit(1) except ClientError as e: error_code = e.response.get('Error', {}).get('Code', '') error_message = e.response.get('Error', {}).get('Message', str(e)) click.echo(f"Error listing buckets: {error_code} - {error_message}", err=True) if error_code == 'AccessDenied': click.echo("Insufficient permissions. Ensure your AWS credentials have s3:ListAllMyBuckets permission.", err=True) sys.exit(1) except Exception as e: click.echo(f"Unexpected error listing buckets: {e}", err=True) sys.exit(1) def load_excluded_buckets(exclusion_file: str = "s3_bucket_exclusions.txt") -> set: """ Load excluded bucket names from a text file Args: exclusion_file: Path to exclusion file (relative to project root) Returns: Set of bucket names to exclude """ excluded = set() exclusion_path = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), exclusion_file ) if not os.path.exists(exclusion_path): logger.debug(f"Exclusion file not found: {exclusion_path}, skipping exclusions") return excluded try: with open(exclusion_path, 'r', encoding='utf-8') as f: for line in f: # Strip whitespace and skip comments/empty lines line = line.strip() if line and not line.startswith('#'): excluded.add(line) if excluded: logger.info(f"Loaded {len(excluded)} excluded bucket(s) from {exclusion_file}") except Exception as e: logger.warning(f"Error loading exclusion file {exclusion_path}: {e}") return excluded def check_existing_deployments(site_repo: SiteDeploymentRepository, bucket_names: List[str]) -> Dict[str, bool]: """ Check which buckets are already registered in the database Args: site_repo: SiteDeploymentRepository instance bucket_names: List of bucket names to check Returns: Dictionary mapping bucket names to boolean (True if registered) """ existing = {} all_sites = site_repo.get_all() registered_buckets = { site.s3_bucket_name for site in all_sites if site.s3_bucket_name and site.storage_provider in ('s3', 's3_compatible') } for bucket_name in bucket_names: existing[bucket_name] = bucket_name in registered_buckets return existing def generate_unique_hostname(bucket_name: str, site_repo: SiteDeploymentRepository, attempt: int = 0) -> str: """ Generate a unique hostname for the pull_zone_bcdn_hostname field Args: bucket_name: S3 bucket name site_repo: SiteDeploymentRepository to check for existing hostnames attempt: Retry attempt number (for appending suffix) Returns: Unique hostname string """ if attempt == 0: base_hostname = f"s3-{bucket_name}.b-cdn.net" else: base_hostname = f"s3-{bucket_name}-{attempt}.b-cdn.net" # Check if hostname already exists existing = site_repo.get_by_bcdn_hostname(base_hostname) if existing is None: return base_hostname # Try again with incremented suffix return generate_unique_hostname(bucket_name, site_repo, attempt + 1) def generate_bucket_hash(bucket_name: str) -> int: """ Generate a numeric hash from bucket name for placeholder IDs Args: bucket_name: S3 bucket name Returns: Integer hash (positive, within reasonable range) """ hash_obj = hashlib.md5(bucket_name.encode()) hash_int = int(hash_obj.hexdigest(), 16) # Take modulo to keep it reasonable, but ensure it's positive return abs(hash_int % 1000000) def register_bucket( bucket_info: BucketInfo, site_repo: SiteDeploymentRepository, site_name: Optional[str] = None, custom_domain: Optional[str] = None ) -> bool: """ Register an S3 bucket as a SiteDeployment record Args: bucket_info: BucketInfo object with bucket details site_repo: SiteDeploymentRepository instance site_name: Optional site name (defaults to bucket name) custom_domain: Optional custom domain for S3 Returns: True if successful, False otherwise """ bucket_name = bucket_info.name bucket_region = bucket_info.region # Check if already registered all_sites = site_repo.get_all() for site in all_sites: if site.s3_bucket_name == bucket_name and site.storage_provider == 's3': click.echo(f" [SKIP] Bucket '{bucket_name}' is already registered (site_id={site.id})") return False # Generate placeholder values for Bunny.net fields bucket_hash = generate_bucket_hash(bucket_name) short_region = map_aws_region_to_short_code(bucket_region) unique_hostname = generate_unique_hostname(bucket_name, site_repo) # Use provided site_name or default to bucket name final_site_name = site_name or bucket_name try: deployment = site_repo.create( site_name=final_site_name, storage_provider='s3', storage_zone_id=bucket_hash, storage_zone_name=f"s3-{bucket_name}", storage_zone_password="s3-placeholder", storage_zone_region=short_region, pull_zone_id=bucket_hash, pull_zone_bcdn_hostname=unique_hostname, custom_hostname=None, s3_bucket_name=bucket_name, s3_bucket_region=bucket_region, s3_custom_domain=custom_domain, s3_endpoint_url=None ) click.echo(f" [OK] Registered bucket '{bucket_name}' as site_id={deployment.id}") return True except ValueError as e: click.echo(f" [ERROR] Failed to register bucket '{bucket_name}': {e}", err=True) return False except Exception as e: click.echo(f" [ERROR] Unexpected error registering bucket '{bucket_name}': {e}", err=True) return False def display_buckets(buckets: List[BucketInfo], existing_map: Dict[str, bool], excluded_buckets: set = None): """ Display buckets in a formatted table Args: buckets: List of BucketInfo objects existing_map: Dictionary mapping bucket names to registration status excluded_buckets: Set of excluded bucket names (optional) """ if excluded_buckets is None: excluded_buckets = set() click.echo("\n" + "=" * 80) click.echo("Available S3 Buckets") click.echo("=" * 80) click.echo(f"{'#':<4} {'Bucket Name':<40} {'Region':<15} {'Status':<15}") click.echo("-" * 80) for idx, bucket in enumerate(buckets, 1): bucket.is_registered = existing_map.get(bucket.name, False) if bucket.name in excluded_buckets: status = "[EXCLUDED]" elif bucket.is_registered: status = "[REGISTERED]" else: status = "[AVAILABLE]" click.echo(f"{idx:<4} {bucket.name:<40} {bucket.region:<15} {status:<15}") click.echo("=" * 80) @click.command() @click.option('--auto-import-all', is_flag=True, default=False, help='Automatically import all unregistered buckets as bucket-only sites (no custom domain)') def main(auto_import_all: bool): """Main entry point for the discovery script""" click.echo("S3 Bucket Discovery and Registration") click.echo("=" * 80) # Initialize database try: db_manager.initialize() except Exception as e: click.echo(f"Error initializing database: {e}", err=True) sys.exit(1) session = db_manager.get_session() site_repo = SiteDeploymentRepository(session) try: # Get S3 client click.echo("\nConnecting to AWS S3...") s3_client = get_s3_client() # List all buckets click.echo("Discovering S3 buckets...") buckets = list_all_buckets(s3_client) if not buckets: click.echo("No S3 buckets found in your AWS account.") return # Load excluded buckets excluded_buckets = load_excluded_buckets() # Check which buckets are already registered bucket_names = [b.name for b in buckets] existing_map = check_existing_deployments(site_repo, bucket_names) # Mark excluded buckets in existing_map for bucket_name in excluded_buckets: if bucket_name in existing_map: existing_map[bucket_name] = True # Treat excluded as "registered" to skip # Display buckets display_buckets(buckets, existing_map, excluded_buckets) # Filter out already registered buckets and excluded buckets available_buckets = [ b for b in buckets if not existing_map.get(b.name, False) and b.name not in excluded_buckets ] if excluded_buckets: excluded_count = sum(1 for b in buckets if b.name in excluded_buckets) if excluded_count > 0: click.echo(f"\nNote: {excluded_count} bucket(s) excluded by exclusion list") if not available_buckets: click.echo("\nAll buckets are already registered.") return # Auto-import mode: register all available buckets as bucket-only sites if auto_import_all: click.echo(f"\nAuto-import mode: Registering {len(available_buckets)} unregistered bucket(s) as bucket-only sites...") success_count = 0 error_count = 0 for bucket_info in available_buckets: # Register as bucket-only (no custom domain, site_name = bucket_name) if register_bucket(bucket_info, site_repo, site_name=None, custom_domain=None): success_count += 1 else: error_count += 1 click.echo(f"\n{'=' * 80}") click.echo(f"Auto-import complete: {success_count} bucket(s) registered, {error_count} failed.") click.echo("=" * 80) return # Interactive mode: prompt for bucket selection click.echo(f"\nFound {len(available_buckets)} available bucket(s) to register.") click.echo("Enter bucket numbers to register (comma-separated, e.g., 1,3,5):") click.echo("Or press Enter to skip registration.") try: selection_input = click.prompt("Selection", default="", type=str).strip() except click.Abort: click.echo("\nOperation cancelled.") return if not selection_input: click.echo("No buckets selected. Exiting.") return # Parse selection try: selected_indices = [int(x.strip()) - 1 for x in selection_input.split(',')] except ValueError: click.echo("Error: Invalid selection format. Use comma-separated numbers (e.g., 1,3,5)", err=True) return # Validate indices valid_selections = [] for idx in selected_indices: if 0 <= idx < len(buckets): if buckets[idx].name in [b.name for b in available_buckets]: valid_selections.append(buckets[idx]) else: click.echo(f"Warning: Bucket #{idx + 1} is already registered, skipping.", err=True) else: click.echo(f"Warning: Invalid bucket number {idx + 1}, skipping.", err=True) if not valid_selections: click.echo("No valid buckets selected.") return # Register selected buckets click.echo(f"\nRegistering {len(valid_selections)} bucket(s)...") success_count = 0 for bucket_info in valid_selections: click.echo(f"\nRegistering bucket: {bucket_info.name}") # Prompt for site name default_site_name = bucket_info.name try: site_name = click.prompt("Site name", default=default_site_name, type=str).strip() except click.Abort: click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'") continue if not site_name: site_name = default_site_name # Prompt for custom domain (optional) try: custom_domain = click.prompt( "Custom domain (optional, press Enter to skip)", default="", type=str ).strip() except click.Abort: click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'") continue if not custom_domain: custom_domain = None # Confirm registration try: if click.confirm(f"Register '{bucket_info.name}' as '{site_name}'?"): if register_bucket(bucket_info, site_repo, site_name, custom_domain): success_count += 1 else: click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'") except click.Abort: click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'") click.echo(f"\n{'=' * 80}") click.echo(f"Registration complete: {success_count}/{len(valid_selections)} bucket(s) registered.") click.echo("=" * 80) except KeyboardInterrupt: click.echo("\n\nOperation cancelled by user.") sys.exit(0) except Exception as e: click.echo(f"\nUnexpected error: {e}", err=True) logger.exception("Unexpected error in bucket discovery") sys.exit(1) finally: session.close() if __name__ == "__main__": main()