Big-Link-Man/src/deployment/s3_storage.py

456 lines
16 KiB
Python

"""
AWS S3 Storage API client for uploading files to S3 buckets
Story 6.2: AWS S3 Client Implementation
"""
import os
import time
import logging
import json
from typing import Optional, TYPE_CHECKING
from dataclasses import dataclass
import boto3
from botocore.exceptions import ClientError, BotoCoreError
from botocore.config import Config
if TYPE_CHECKING:
from src.database.models import SiteDeployment
from src.deployment.bunny_storage import UploadResult
logger = logging.getLogger(__name__)
class S3StorageError(Exception):
"""Base exception for S3 Storage API errors"""
pass
class S3StorageAuthError(S3StorageError):
"""Authentication error with S3 Storage API"""
pass
class S3StorageClient:
"""Client for uploading files to AWS S3 buckets"""
def __init__(self, max_retries: int = 3):
"""
Initialize S3 Storage client
Args:
max_retries: Maximum number of retry attempts for failed uploads
"""
self.max_retries = max_retries
self._client_cache = {}
def _get_s3_client(self, region: Optional[str] = None, endpoint_url: Optional[str] = None):
"""
Get or create boto3 S3 client with credentials from environment
Args:
region: AWS region (optional, uses AWS_REGION env var or default)
endpoint_url: Custom endpoint URL for S3-compatible services
Returns:
boto3 S3 client instance
Raises:
S3StorageAuthError: If AWS credentials are missing
"""
# Create new client if endpoint_url changes (for s3_compatible)
cache_key = f"{region or 'default'}:{endpoint_url or 'default'}"
if not hasattr(self, '_client_cache'):
self._client_cache = {}
if cache_key not in self._client_cache:
access_key = os.getenv('AWS_ACCESS_KEY_ID')
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
default_region = os.getenv('AWS_REGION', 'us-east-1')
if not access_key or not secret_key:
raise S3StorageAuthError(
"AWS credentials not found. Set AWS_ACCESS_KEY_ID and "
"AWS_SECRET_ACCESS_KEY environment variables."
)
region_to_use = region or default_region
config = Config(
retries={'max_attempts': self.max_retries, 'mode': 'adaptive'},
connect_timeout=60,
read_timeout=60
)
client_kwargs = {
'aws_access_key_id': access_key,
'aws_secret_access_key': secret_key,
'region_name': region_to_use,
'config': config
}
if endpoint_url:
client_kwargs['endpoint_url'] = endpoint_url
client = boto3.client('s3', **client_kwargs)
resource = boto3.resource('s3', **client_kwargs)
self._client_cache[cache_key] = {'client': client, 'resource': resource}
return self._client_cache[cache_key]['client']
def _get_s3_resource(self, region: Optional[str] = None, endpoint_url: Optional[str] = None):
"""Get or create boto3 S3 resource"""
cache_key = f"{region or 'default'}:{endpoint_url or 'default'}"
if not hasattr(self, '_client_cache'):
self._client_cache = {}
if cache_key not in self._client_cache:
self._get_s3_client(region, endpoint_url)
return self._client_cache[cache_key]['resource']
def _get_bucket_name(self, site: "SiteDeployment") -> str:
"""
Extract bucket name from SiteDeployment
Args:
site: SiteDeployment object
Returns:
S3 bucket name
Raises:
ValueError: If bucket name is not configured
"""
bucket_name = getattr(site, 's3_bucket_name', None)
if not bucket_name:
raise ValueError(
"s3_bucket_name not configured for site. "
"Set s3_bucket_name in SiteDeployment."
)
return bucket_name
def _get_bucket_region(self, site: "SiteDeployment") -> str:
"""
Extract bucket region from SiteDeployment or use default
Args:
site: SiteDeployment object
Returns:
AWS region string
"""
region = getattr(site, 's3_bucket_region', None)
if region:
return region
return os.getenv('AWS_REGION', 'us-east-1')
def _get_endpoint_url(self, site: "SiteDeployment") -> Optional[str]:
"""
Extract custom endpoint URL for S3-compatible services
Args:
site: SiteDeployment object
Returns:
Endpoint URL string or None for standard AWS S3
"""
return getattr(site, 's3_endpoint_url', None)
def _get_content_type(self, file_path: str) -> str:
"""
Determine content type based on file extension
Args:
file_path: File path
Returns:
MIME type string
"""
file_path_lower = file_path.lower()
if file_path_lower.endswith('.html') or file_path_lower.endswith('.htm'):
return 'text/html'
elif file_path_lower.endswith('.css'):
return 'text/css'
elif file_path_lower.endswith('.js'):
return 'application/javascript'
elif file_path_lower.endswith('.json'):
return 'application/json'
elif file_path_lower.endswith('.xml'):
return 'application/xml'
elif file_path_lower.endswith('.png'):
return 'image/png'
elif file_path_lower.endswith('.jpg') or file_path_lower.endswith('.jpeg'):
return 'image/jpeg'
elif file_path_lower.endswith('.gif'):
return 'image/gif'
elif file_path_lower.endswith('.svg'):
return 'image/svg+xml'
else:
return 'application/octet-stream'
def _configure_bucket_public_read(
self,
bucket_name: str,
region: str,
endpoint_url: Optional[str] = None
):
"""
Configure S3 bucket for public read access only
This method:
1. Disables "Block Public Access" settings for read access
2. Applies bucket policy for public read access
3. Validates configuration
Args:
bucket_name: S3 bucket name
region: AWS region
endpoint_url: Custom endpoint URL for S3-compatible services
Raises:
S3StorageError: If configuration fails
"""
try:
s3_client = self._get_s3_client(region, endpoint_url)
# Disable Block Public Access settings (required for public read)
try:
s3_client.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': False,
'IgnorePublicAcls': False,
'BlockPublicPolicy': False,
'RestrictPublicBuckets': False
}
)
logger.info(f"Disabled Block Public Access settings for bucket {bucket_name}")
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code != 'NoSuchBucket':
logger.warning(f"Could not update Block Public Access settings: {e}")
# Apply bucket policy for public read access only
bucket_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PublicReadGetObject",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": f"arn:aws:s3:::{bucket_name}/*"
}
]
}
try:
s3_client.put_bucket_policy(
Bucket=bucket_name,
Policy=json.dumps(bucket_policy)
)
logger.info(f"Applied public read bucket policy to {bucket_name}")
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code == 'NoSuchBucket':
raise S3StorageError(f"Bucket {bucket_name} does not exist")
logger.warning(f"Could not apply bucket policy: {e}")
# Validate bucket exists
try:
s3_client.head_bucket(Bucket=bucket_name)
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code == '404':
raise S3StorageError(f"Bucket {bucket_name} does not exist")
elif error_code == '403':
raise S3StorageAuthError(
f"Access denied to bucket {bucket_name}. "
f"Check AWS credentials and bucket permissions."
)
raise S3StorageError(f"Failed to validate bucket: {e}")
except BotoCoreError as e:
raise S3StorageError(f"Failed to configure bucket: {str(e)}")
def _generate_public_url(
self,
bucket_name: str,
file_path: str,
region: str,
custom_domain: Optional[str] = None
) -> str:
"""
Generate public URL for uploaded file
Args:
bucket_name: S3 bucket name
file_path: File path within bucket
region: AWS region
custom_domain: Optional custom domain (manual setup required)
Returns:
Public URL string
"""
if custom_domain:
return f"https://{custom_domain.rstrip('/')}/{file_path}"
# Virtual-hosted style URL (default for AWS S3)
return f"https://{bucket_name}.s3.{region}.amazonaws.com/{file_path}"
def upload_file(
self,
site: "SiteDeployment",
file_path: str,
content: str
) -> UploadResult:
"""
Upload a file to S3 bucket
Args:
site: SiteDeployment object with S3 configuration
file_path: Path within bucket (e.g., 'my-article.html')
content: File content to upload
Returns:
UploadResult with success status and message
Raises:
S3StorageAuthError: If authentication fails
S3StorageError: For other S3 errors
ValueError: If required configuration is missing
"""
bucket_name = self._get_bucket_name(site)
region = self._get_bucket_region(site)
endpoint_url = self._get_endpoint_url(site)
custom_domain = getattr(site, 's3_custom_domain', None)
content_type = self._get_content_type(file_path)
# Configure bucket for public read access on first upload attempt
# This is idempotent and safe to call multiple times
try:
self._configure_bucket_public_read(bucket_name, region, endpoint_url)
except S3StorageError as e:
logger.warning(f"Bucket configuration warning: {e}")
s3_client = self._get_s3_client(region, endpoint_url)
for attempt in range(self.max_retries):
try:
# Upload file with public-read ACL
s3_client.put_object(
Bucket=bucket_name,
Key=file_path,
Body=content.encode('utf-8'),
ContentType=content_type,
ACL='public-read'
)
public_url = self._generate_public_url(
bucket_name, file_path, region, custom_domain
)
logger.info(f"Uploaded {file_path} to s3://{bucket_name}/{file_path}")
return UploadResult(
success=True,
file_path=file_path,
message=f"Upload successful. Public URL: {public_url}"
)
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
error_message = e.response.get('Error', {}).get('Message', str(e))
# Handle specific error codes
if error_code == 'NoSuchBucket':
raise S3StorageError(
f"Bucket {bucket_name} does not exist. "
f"Create the bucket first or check bucket name."
)
if error_code == '403' or error_code == 'AccessDenied':
raise S3StorageAuthError(
f"Access denied to bucket {bucket_name}. "
f"Check AWS credentials and bucket permissions. "
f"Error: {error_message}"
)
if error_code == '404':
raise S3StorageError(
f"Bucket {bucket_name} not found in region {region}"
)
# Retry on transient errors
if attempt < self.max_retries - 1:
wait_time = 2 ** attempt
logger.warning(
f"S3 upload failed (attempt {attempt + 1}/{self.max_retries}): "
f"{error_code} - {error_message}. Retrying in {wait_time}s"
)
time.sleep(wait_time)
continue
raise S3StorageError(
f"S3 upload failed after {self.max_retries} attempts: "
f"{error_code} - {error_message}"
)
except BotoCoreError as e:
if attempt < self.max_retries - 1:
wait_time = 2 ** attempt
logger.warning(
f"S3 upload error (attempt {attempt + 1}/{self.max_retries}): "
f"{str(e)}. Retrying in {wait_time}s"
)
time.sleep(wait_time)
continue
raise S3StorageError(
f"S3 upload failed after {self.max_retries} attempts: {str(e)}"
)
raise S3StorageError(f"Upload failed after {self.max_retries} attempts")
def map_aws_region_to_short_code(aws_region: str) -> str:
"""
Map AWS region code (e.g., 'us-east-1') to short region code used by the system
Args:
aws_region: AWS region code (e.g., 'us-east-1', 'eu-west-1')
Returns:
Short region code (e.g., 'US', 'EU')
Note:
Returns 'US' as default for unknown regions
"""
region_mapping = {
# US regions
'us-east-1': 'US',
'us-east-2': 'US',
'us-west-1': 'US',
'us-west-2': 'US',
# EU regions
'eu-west-1': 'EU',
'eu-west-2': 'EU',
'eu-west-3': 'EU',
'eu-central-1': 'EU',
'eu-north-1': 'EU',
'eu-south-1': 'EU',
# Asia Pacific
'ap-southeast-1': 'SG',
'ap-southeast-2': 'SYD',
'ap-northeast-1': 'JP',
'ap-northeast-2': 'KR',
'ap-south-1': 'IN',
# Other
'ca-central-1': 'CA',
'sa-east-1': 'SA',
'af-south-1': 'AF',
'me-south-1': 'ME',
}
return region_mapping.get(aws_region.lower(), 'US')