Add S3 bucket discovery with auto-import and exclusion list support

- Add auto-import-all flag to discover_s3_buckets.py for bulk import
- Add bucket exclusion list (s3_bucket_exclusions.txt) to prevent re-importing manually added FQDN sites
- Add helper scripts for S3 site management (list, check, delete)
- Update README.md with comprehensive S3 bucket management documentation
- Add colinkri_processor.py for batch processing
- Various deployment and storage improvements
main
PeninsulaInd 2025-12-30 16:57:51 -06:00
parent 3cd62e4135
commit 0748b29f7c
26 changed files with 2452 additions and 76 deletions

127
README.md
View File

@ -159,6 +159,107 @@ uv run python main.py remove-site \
--admin-password adminpass
```
## S3 Bucket Management
The platform supports AWS S3 buckets as storage providers alongside bunny.net. S3 buckets can be discovered, registered, and managed through the system.
### Prerequisites
Set AWS credentials in `.env`:
```bash
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_REGION=us-east-1 # Optional, defaults to us-east-1
```
### Discover and Register S3 Buckets
**Interactive Mode** (select buckets manually):
```bash
uv run python main.py discover-s3-buckets
```
Or run the script directly:
```bash
uv run python scripts/discover_s3_buckets.py
```
**Auto-Import Mode** (import all unregistered buckets automatically):
```bash
uv run python scripts/discover_s3_buckets.py --auto-import-all
```
Auto-import mode will:
- Discover all S3 buckets in your AWS account
- Skip buckets already registered in the database
- Skip buckets in the exclusion list
- Register remaining buckets as bucket-only sites (no custom domain)
### Bucket Exclusion List
To prevent certain buckets from being auto-imported (e.g., buckets manually added with FQDNs), add them to `s3_bucket_exclusions.txt`:
```
# S3 Bucket Exclusion List
# One bucket name per line (comments start with #)
learningeducationtech.com
theteacher.best
airconditionerfixer.com
```
The discovery script automatically loads and respects this exclusion list. Excluded buckets are marked as `[EXCLUDED]` in the display and are skipped during both interactive and auto-import operations.
### List S3 Sites with FQDNs
To see which S3 buckets have custom domains (and should be excluded):
```bash
uv run python scripts/list_s3_fqdn_sites.py
```
This script lists all S3 sites with `s3_custom_domain` set and outputs bucket names that should be added to the exclusion list.
### S3 Site Types
S3 sites can be registered in two ways:
1. **Bucket-only sites**: No custom domain, accessed via S3 website endpoint
- Created via auto-import or interactive discovery
- Uses bucket name as site identifier
- URL format: `https://bucket-name.s3.region.amazonaws.com/`
2. **FQDN sites**: Manually added with custom domains
- Created manually with `s3_custom_domain` set
- Should be added to exclusion list to prevent re-import
- URL format: `https://custom-domain.com/`
### S3 Storage Features
- **Multi-region support**: Automatically detects bucket region
- **Public read access**: Buckets configured for public read-only access
- **Bucket policy**: Applied automatically for public read access
- **Region mapping**: AWS regions mapped to short codes (US, EU, SG, etc.)
- **Duplicate prevention**: Checks existing registrations before importing
### Helper Scripts
**List S3 FQDN sites**:
```bash
uv run python scripts/list_s3_fqdn_sites.py
```
**Delete sites by ID**:
```bash
# Edit scripts/delete_sites.py to set site_ids, then:
uv run python scripts/delete_sites.py
```
**Check sites around specific IDs**:
```bash
# Edit scripts/list_sites_by_id.py to set target_ids, then:
uv run python scripts/list_sites_by_id.py
```
## Project Management
### Ingest CORA Report
@ -279,6 +380,32 @@ uv run python main.py get-links \
## Utility Scripts
### Add robots.txt to All Buckets
Add a standardized robots.txt file to all storage buckets (both S3 and Bunny) that blocks SEO tools and bad bots while allowing legitimate search engines and AI crawlers:
```bash
# Preview what would be done (recommended first)
uv run python scripts/add_robots_txt_to_buckets.py --dry-run
# Upload to all buckets
uv run python scripts/add_robots_txt_to_buckets.py
# Only process S3 buckets
uv run python scripts/add_robots_txt_to_buckets.py --provider s3
# Only process Bunny storage zones
uv run python scripts/add_robots_txt_to_buckets.py --provider bunny
```
**robots.txt behavior:**
- Allows: Google, Bing, Yahoo, DuckDuckGo, Baidu, Yandex
- Allows: GPTBot, Claude, Common Crawl, Perplexity, ByteDance AI
- Blocks: Ahrefs, Semrush, Moz, and other SEO tools
- Blocks: HTTrack, Wget, and other scrapers/bad bots
The script is idempotent (safe to run multiple times) and will overwrite existing robots.txt files. It continues processing remaining buckets if one fails and reports all failures at the end.
### Check Last Generated Content
```bash
uv run python check_last_gen.py

22
run_colinkri.bat 100644
View File

@ -0,0 +1,22 @@
@echo off
cd /d "E:\dev\Big-Link-Man"
set LOG_FILE=logs\colinkri_log_%date:~-4,4%%date:~-10,2%%date:~-7,2%_%time:~0,2%%time:~3,2%%time:~6,2%.txt
set LOG_FILE=%LOG_FILE: =0%
echo ======================================== >> "%LOG_FILE%"
echo Colinkri Processor Run - %date% %time% >> "%LOG_FILE%"
echo ======================================== >> "%LOG_FILE%"
echo. >> "%LOG_FILE%"
uv run python src/generation/colinkri_processor.py >> "%LOG_FILE%" 2>&1
echo. >> "%LOG_FILE%"
echo ======================================== >> "%LOG_FILE%"
echo Completed at %date% %time% >> "%LOG_FILE%"
echo ======================================== >> "%LOG_FILE%"
echo. >> "%LOG_FILE%"
echo.
echo Output logged to: %LOG_FILE%
echo Window will close in 10 seconds...
timeout /t 10 /nobreak >nul

View File

@ -0,0 +1,17 @@
# S3 Bucket Exclusion List
# Buckets listed here will be skipped during auto-import
# One bucket name per line (comments start with #)
#
# These buckets have been manually added with FQDNs and should not be re-imported
learningeducationtech.com
theteacher.best
schooleducation.pro
school4education.com
localtris.com
airconditionerfixer.com
# Sites 567, 568, 569 (www.fractuslearning.com, www.rocktumbler.net, www.theteacher.best)
www.fractuslearning.com
www.rocktumbler.net
www.theteacher.best

View File

@ -0,0 +1,393 @@
"""
Script to add robots.txt files to all storage buckets (both S3 and Bunny)
This script:
1. Queries the database for all site deployments
2. Generates a standard robots.txt that blocks SEO tools/bad bots while allowing search engines and AI
3. Uploads robots.txt to each bucket using the appropriate storage client
4. Handles both S3 and Bunny storage providers
5. Overwrites existing robots.txt files (idempotent - safe to run multiple times)
6. Continues processing on errors and reports failures at the end
Usage:
python scripts/add_robots_txt_to_buckets.py # Actually upload
python scripts/add_robots_txt_to_buckets.py --dry-run # Preview only
python scripts/add_robots_txt_to_buckets.py --provider s3 # Only S3 buckets
python scripts/add_robots_txt_to_buckets.py --provider bunny # Only Bunny buckets
"""
import sys
import os
import argparse
import logging
from pathlib import Path
from typing import List, Tuple
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from dotenv import load_dotenv
from src.database.models import SiteDeployment
from src.deployment.storage_factory import create_storage_client
from src.deployment.bunny_storage import UploadResult
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_robots_txt_content() -> str:
"""
Generate the robots.txt content
This configuration:
- Allows legitimate search engines (Google, Bing, Yahoo, DuckDuckGo, Baidu, Yandex)
- Allows AI crawlers (GPT, Claude, Common Crawl, Perplexity, ByteDance)
- Blocks SEO tools (Ahrefs, Semrush, Moz, etc.)
- Blocks bad bots and site scrapers
- Allows everything else by default
Returns:
String content of robots.txt file
"""
return """# Allow legitimate search engines
User-agent: Googlebot
Allow: /
User-agent: Bingbot
Allow: /
User-agent: Slurp
Allow: /
User-agent: DuckDuckBot
Allow: /
User-agent: Baiduspider
Allow: /
User-agent: YandexBot
Allow: /
# Allow AI crawlers
User-agent: GPTBot
Allow: /
User-agent: ChatGPT-User
Allow: /
User-agent: CCBot
Allow: /
User-agent: Claude-Web
Allow: /
User-agent: anthropic-ai
Allow: /
User-agent: PerplexityBot
Allow: /
User-agent: Bytespider
Allow: /
# Block SEO tools
User-agent: AhrefsBot
Disallow: /
User-agent: SemrushBot
Disallow: /
User-agent: DotBot
Disallow: /
User-agent: Mj12bot
Disallow: /
User-agent: BLEXBot
Disallow: /
User-agent: DataForSeoBot
Disallow: /
User-agent: PetalBot
Disallow: /
User-agent: SeznamBot
Disallow: /
# Block common bad bots
User-agent: MauiBot
Disallow: /
User-agent: AlphaBot
Disallow: /
User-agent: SiteSnagger
Disallow: /
User-agent: WebStripper
Disallow: /
User-agent: WebCopier
Disallow: /
User-agent: WebZIP
Disallow: /
User-agent: Teleport
Disallow: /
User-agent: TeleportPro
Disallow: /
User-agent: Wget
Disallow: /
User-agent: HTTrack
Disallow: /
User-agent: Microsoft.URL
Disallow: /
User-agent: Xenu
Disallow: /
User-agent: larbin
Disallow: /
User-agent: libwww
Disallow: /
User-agent: ZyBORG
Disallow: /
User-agent: Download
Disallow: /
# Default - allow everyone else (mostly for legitimate indexing)
User-agent: *
Allow: /
"""
def get_database_session() -> Session:
"""
Create and return a database session
Reads DATABASE_URL from environment variables (.env file)
Returns:
SQLAlchemy Session object
Raises:
ValueError: If DATABASE_URL is not set
"""
load_dotenv()
database_url = os.getenv("DATABASE_URL")
if not database_url:
raise ValueError("DATABASE_URL environment variable is required")
engine = create_engine(database_url)
return Session(engine)
def get_all_site_deployments(session: Session, provider_filter: str = None) -> List[SiteDeployment]:
"""
Query database for all site deployments
Args:
session: Database session
provider_filter: Optional filter - 's3', 'bunny', or None for all
Returns:
List of SiteDeployment objects
"""
query = session.query(SiteDeployment)
# Apply provider filter if specified
if provider_filter == 's3':
# Include both 's3' and 's3_compatible'
query = query.filter(SiteDeployment.storage_provider.in_(['s3', 's3_compatible']))
elif provider_filter == 'bunny':
query = query.filter(SiteDeployment.storage_provider == 'bunny')
return query.all()
def upload_robots_txt(
site: SiteDeployment,
robots_content: str,
dry_run: bool = False
) -> Tuple[bool, str]:
"""
Upload robots.txt to a single site's storage bucket
Args:
site: SiteDeployment object with storage configuration
robots_content: Content of robots.txt file
dry_run: If True, only log what would be done without uploading
Returns:
Tuple of (success: bool, message: str)
"""
try:
# Determine hostname for logging
hostname = site.custom_hostname or site.pull_zone_bcdn_hostname
# For S3, also show bucket name
if site.storage_provider in ['s3', 's3_compatible']:
bucket_info = f" (bucket: {site.s3_bucket_name})" if site.s3_bucket_name else ""
else:
bucket_info = f" (zone: {site.storage_zone_name})"
if dry_run:
logger.info(
f"[DRY RUN] Would upload robots.txt to {site.storage_provider} - "
f"{hostname}{bucket_info}"
)
return True, f"Dry run - would upload to {hostname}"
# Create appropriate storage client based on provider
storage_client = create_storage_client(site)
# Upload robots.txt file
# Note: upload_file handles both str and bytes content
result: UploadResult = storage_client.upload_file(
site=site,
file_path='robots.txt', # Root level file
content=robots_content
)
if result.success:
logger.info(
f"✓ Successfully uploaded robots.txt to {site.storage_provider} - "
f"{hostname}{bucket_info}"
)
return True, result.message
else:
logger.error(
f"✗ Failed to upload robots.txt to {site.storage_provider} - "
f"{hostname}{bucket_info}: {result.message}"
)
return False, result.message
except Exception as e:
hostname = site.custom_hostname or site.pull_zone_bcdn_hostname
error_msg = f"Exception during upload: {str(e)}"
logger.error(f"✗ Error uploading to {hostname}: {error_msg}")
return False, error_msg
def main():
"""
Main script execution
Process:
1. Parse command line arguments
2. Load robots.txt content
3. Connect to database and fetch all site deployments
4. Iterate through each site and upload robots.txt
5. Track successes and failures
6. Report summary at the end
"""
# Parse command line arguments
parser = argparse.ArgumentParser(
description='Add robots.txt files to all storage buckets'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Preview what would be done without actually uploading'
)
parser.add_argument(
'--provider',
choices=['s3', 'bunny'],
help='Only process specific provider (s3 or bunny). Default: process all'
)
args = parser.parse_args()
# Load robots.txt content
robots_content = get_robots_txt_content()
logger.info("=" * 80)
logger.info("Starting robots.txt upload to all storage buckets")
if args.dry_run:
logger.info("DRY RUN MODE - No actual uploads will be performed")
if args.provider:
logger.info(f"Provider filter: {args.provider}")
logger.info("=" * 80)
try:
# Connect to database
session = get_database_session()
# Get all site deployments (optionally filtered by provider)
sites = get_all_site_deployments(session, args.provider)
if not sites:
logger.warning("No site deployments found in database")
return
logger.info(f"Found {len(sites)} site deployment(s) to process")
logger.info("")
# Track results
successes = []
failures = []
# Process each site
for idx, site in enumerate(sites, 1):
hostname = site.custom_hostname or site.pull_zone_bcdn_hostname
logger.info(f"[{idx}/{len(sites)}] Processing {hostname}...")
success, message = upload_robots_txt(site, robots_content, args.dry_run)
if success:
successes.append((site, message))
else:
failures.append((site, message))
logger.info("") # Blank line for readability
# Print summary
logger.info("=" * 80)
logger.info("SUMMARY")
logger.info("=" * 80)
logger.info(f"Total processed: {len(sites)}")
logger.info(f"Successful: {len(successes)}")
logger.info(f"Failed: {len(failures)}")
# Print failures if any
if failures:
logger.info("")
logger.info("FAILURES:")
for site, error_msg in failures:
hostname = site.custom_hostname or site.pull_zone_bcdn_hostname
logger.error(f"{hostname}: {error_msg}")
logger.info("=" * 80)
# Exit with error code if there were failures
if failures and not args.dry_run:
sys.exit(1)
except Exception as e:
logger.error(f"Fatal error: {str(e)}", exc_info=True)
sys.exit(1)
finally:
# Close database session if it exists
if 'session' in locals():
session.close()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,308 @@
"""
Check a list of domains to see if they're AWS-related (S3 buckets, CloudFront, etc.)
Takes a list of domains and checks:
1. If domain name matches an S3 bucket name
2. DNS records pointing to CloudFront or S3
3. ACM validation records (indicates AWS usage)
"""
import os
import sys
import socket
import re
from typing import List, Dict, Optional, Set
from collections import defaultdict
import boto3
import click
from botocore.exceptions import ClientError, NoCredentialsError
from dotenv import load_dotenv
# Load .env file
load_dotenv()
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.deployment.s3_storage import map_aws_region_to_short_code
try:
import dns.resolver
DNS_AVAILABLE = True
except ImportError:
DNS_AVAILABLE = False
click.echo("Warning: dnspython not available. Install with: pip install dnspython", err=True)
click.echo("Will use basic socket resolution only.", err=True)
def get_s3_buckets() -> List[Dict[str, str]]:
"""Get all S3 buckets with their regions"""
try:
s3_client = boto3.client('s3')
response = s3_client.list_buckets()
buckets = []
for bucket in response.get('Buckets', []):
bucket_name = bucket['Name']
# Get bucket region
try:
region_response = s3_client.get_bucket_location(Bucket=bucket_name)
region = region_response.get('LocationConstraint', 'us-east-1')
if region is None or region == '':
region = 'us-east-1'
except ClientError:
region = 'us-east-1'
buckets.append({
'name': bucket_name,
'region': region
})
return buckets
except NoCredentialsError:
click.echo("Error: AWS credentials not found.", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error listing buckets: {e}", err=True)
sys.exit(1)
def resolve_dns(domain: str) -> Dict[str, List[str]]:
"""
Resolve DNS records for a domain
Returns:
Dictionary with 'A', 'CNAME', 'TXT' record lists
"""
records = {'A': [], 'CNAME': [], 'TXT': []}
if DNS_AVAILABLE:
try:
# Try to get CNAME records
try:
answers = dns.resolver.resolve(domain, 'CNAME')
records['CNAME'] = [str(r.target) for r in answers]
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
# Try to get A records
try:
answers = dns.resolver.resolve(domain, 'A')
records['A'] = [str(r) for r in answers]
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
# Try to get TXT records (for ACM validation)
try:
answers = dns.resolver.resolve(domain, 'TXT')
records['TXT'] = [str(r) for r in answers]
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
except Exception as e:
click.echo(f" Warning: DNS lookup failed for {domain}: {e}", err=True)
else:
# Fallback to basic socket resolution
try:
ip = socket.gethostbyname(domain)
records['A'] = [ip]
except socket.gaierror:
pass
return records
def check_aws_indicators(domain: str, dns_records: Dict[str, List[str]], buckets: List[Dict[str, str]]) -> Dict:
"""
Check if domain shows AWS-related indicators
Returns:
Dictionary with match information
"""
bucket_names = {b['name'] for b in buckets}
domain_lower = domain.lower().replace('www.', '')
result = {
'domain': domain,
'bucket_match': None,
'bucket_region': None,
'dns_cloudfront': False,
'dns_s3': False,
'acm_validation': False,
'confidence': 'none',
'indicators': []
}
# Check if domain name matches a bucket name
if domain_lower in bucket_names:
result['bucket_match'] = domain_lower
result['confidence'] = 'high'
result['indicators'].append('Bucket name match')
# Find bucket region
for bucket in buckets:
if bucket['name'] == domain_lower:
result['bucket_region'] = bucket['region']
break
# Check DNS records for AWS indicators
all_targets = []
for record_type, values in dns_records.items():
all_targets.extend(values)
for target in all_targets:
target_lower = target.lower()
# Check for CloudFront
if 'cloudfront.net' in target_lower:
result['dns_cloudfront'] = True
result['indicators'].append(f'CloudFront: {target}')
if result['confidence'] == 'none':
result['confidence'] = 'high'
# Check for S3 website endpoints
if 's3-website' in target_lower or '.s3.' in target_lower:
result['dns_s3'] = True
result['indicators'].append(f'S3 endpoint: {target}')
if result['confidence'] == 'none':
result['confidence'] = 'high'
# Try to extract bucket name
match = re.search(r'([^/\.]+)\.s3-website-', target_lower)
if not match:
match = re.search(r'([^/\.]+)\.s3\.', target_lower)
if match:
extracted_bucket = match.group(1)
if extracted_bucket in bucket_names:
result['bucket_match'] = extracted_bucket
result['confidence'] = 'high'
# Check for ACM validation
if 'acm-validations.aws' in target_lower:
result['acm_validation'] = True
result['indicators'].append('ACM validation record')
if result['confidence'] == 'none':
result['confidence'] = 'medium'
return result
@click.command()
@click.argument('domains_file', type=click.Path(exists=True))
@click.option('--output', '-o', type=click.Path(), help='Output CSV file for results')
@click.option('--skip-dns', is_flag=True, help='Skip DNS lookups (faster, name matching only)')
def main(domains_file: str, output: Optional[str], skip_dns: bool):
"""Check domains from a file to see if they're AWS-related"""
click.echo("Checking domains for AWS indicators...")
click.echo("=" * 80)
# Read domain list
click.echo(f"\n1. Reading domains from {domains_file}...")
domains = []
with open(domains_file, 'r', encoding='utf-8') as f:
for line in f:
domain = line.strip()
if domain and not domain.startswith('#'):
# Remove www. prefix if present
domain = domain.replace('www.', '').strip()
if domain:
domains.append(domain)
click.echo(f" Found {len(domains)} domains")
# Get S3 buckets
click.echo("\n2. Fetching S3 buckets...")
buckets = get_s3_buckets()
click.echo(f" Found {len(buckets)} S3 buckets")
# Check each domain
click.echo(f"\n3. Checking domains{' (DNS lookups enabled)' if not skip_dns else ' (name matching only)'}...")
results = []
for idx, domain in enumerate(domains, 1):
click.echo(f" [{idx}/{len(domains)}] Checking {domain}...", nl=False)
dns_records = {}
if not skip_dns:
dns_records = resolve_dns(domain)
# Also check www subdomain
www_domain = f"www.{domain}"
www_records = resolve_dns(www_domain)
# Merge www records
for record_type in ['A', 'CNAME', 'TXT']:
dns_records[record_type] = dns_records.get(record_type, []) + www_records.get(record_type, [])
result = check_aws_indicators(domain, dns_records, buckets)
results.append(result)
if result['confidence'] != 'none':
click.echo(f" [MATCH - {result['confidence']}]")
else:
click.echo(" [No match]")
# Display results
click.echo("\n" + "=" * 80)
click.echo("RESULTS")
click.echo("=" * 80)
# Group by confidence
high_confidence = [r for r in results if r['confidence'] == 'high']
medium_confidence = [r for r in results if r['confidence'] == 'medium']
no_match = [r for r in results if r['confidence'] == 'none']
if high_confidence:
click.echo(f"\nHIGH CONFIDENCE ({len(high_confidence)} domains):")
click.echo("-" * 80)
for result in sorted(high_confidence, key=lambda x: x['domain']):
bucket_info = f" -> {result['bucket_match']}" if result['bucket_match'] else ""
region_info = f" ({result['bucket_region']})" if result['bucket_region'] else ""
click.echo(f" [OK] {result['domain']:<40}{bucket_info}{region_info}")
if result['indicators']:
for indicator in result['indicators']:
click.echo(f" - {indicator}")
if medium_confidence:
click.echo(f"\nMEDIUM CONFIDENCE ({len(medium_confidence)} domains):")
click.echo("-" * 80)
for result in sorted(medium_confidence, key=lambda x: x['domain']):
click.echo(f" [?] {result['domain']:<40}")
if result['indicators']:
for indicator in result['indicators']:
click.echo(f" - {indicator}")
if no_match:
click.echo(f"\nNO MATCH ({len(no_match)} domains):")
click.echo("-" * 80)
for result in sorted(no_match, key=lambda x: x['domain']):
click.echo(f" [ ] {result['domain']}")
# Save to CSV if requested
if output:
click.echo(f"\n4. Saving results to {output}...")
import csv
with open(output, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'domain', 'bucket_match', 'bucket_region', 'confidence',
'dns_cloudfront', 'dns_s3', 'acm_validation', 'indicators'
])
writer.writeheader()
for result in results:
row = result.copy()
row['indicators'] = '; '.join(row['indicators'])
writer.writerow(row)
click.echo(f" Saved {len(results)} results to {output}")
click.echo("\n" + "=" * 80)
click.echo("Summary:")
click.echo(f" Total domains checked: {len(domains)}")
click.echo(f" High confidence matches: {len(high_confidence)}")
click.echo(f" Medium confidence matches: {len(medium_confidence)}")
click.echo(f" No matches: {len(no_match)}")
click.echo("=" * 80)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,46 @@
"""
Check for articles assigned to deleted test sites
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.repositories import GeneratedContentRepository, SiteDeploymentRepository
def check_orphaned_articles():
"""Check for articles assigned to deleted test sites"""
db_manager.initialize()
session = db_manager.get_session()
content_repo = GeneratedContentRepository(session)
site_repo = SiteDeploymentRepository(session)
deleted_site_ids = [398, 399, 400, 401]
try:
# Query articles assigned to deleted sites
from src.database.models import GeneratedContent
orphaned = session.query(GeneratedContent).filter(
GeneratedContent.site_deployment_id.in_(deleted_site_ids)
).all()
if orphaned:
print(f"Found {len(orphaned)} articles assigned to deleted test sites:")
for article in orphaned:
print(f" Article {article.id}: '{article.title}' -> site_deployment_id={article.site_deployment_id}")
print(f"\nThese articles should be reassigned or deleted.")
else:
print("No orphaned articles found. All good!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == "__main__":
check_orphaned_articles()

View File

@ -0,0 +1,51 @@
"""
Check specific sites from database by ID
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.repositories import SiteDeploymentRepository
def check_sites(site_ids):
"""Check sites by their IDs"""
db_manager.initialize()
session = db_manager.get_session()
site_repo = SiteDeploymentRepository(session)
try:
for site_id in site_ids:
site = site_repo.get_by_id(site_id)
if site:
hostname = site.custom_hostname or site.pull_zone_bcdn_hostname
print(f"Site {site_id}: {site.site_name} ({hostname})")
print(f" Storage: {site.storage_provider}")
print(f" Bucket: {site.s3_bucket_name}")
print(f" Custom domain: {site.s3_custom_domain}")
else:
print(f"Site {site_id}: NOT FOUND")
# Also check by querying directly
print("\nDirect query check:")
from src.database.models import SiteDeployment
for site_id in site_ids:
site = session.query(SiteDeployment).filter(SiteDeployment.id == site_id).first()
if site:
print(f"Site {site_id}: Found via direct query - {site.site_name}")
else:
print(f"Site {site_id}: NOT FOUND via direct query")
except Exception as e:
print(f"Error checking sites: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == "__main__":
site_ids = [568, 569, 570]
check_sites(site_ids)

View File

@ -0,0 +1,62 @@
"""
Check specific sites by their FQDNs to get bucket names
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.repositories import SiteDeploymentRepository
def check_sites_by_domains():
"""Check sites by their custom domains"""
db_manager.initialize()
session = db_manager.get_session()
site_repo = SiteDeploymentRepository(session)
domains = [
"www.fractuslearning.com",
"www.rocktumbler.net",
"www.theteacher.best"
]
try:
print("\nChecking sites by custom domain:\n")
for domain in domains:
site = site_repo.get_by_hostname(domain)
if site:
print(f"Domain: {domain}")
print(f" Site ID: {site.id}")
print(f" Site Name: {site.site_name}")
print(f" Bucket Name: {site.s3_bucket_name}")
print(f" Custom Domain: {site.s3_custom_domain}")
print()
else:
print(f"Domain: {domain} - NOT FOUND\n")
# Also check by site IDs mentioned
print("\nChecking by site IDs 567, 568, 569:\n")
for site_id in [567, 568, 569]:
site = site_repo.get_by_id(site_id)
if site:
print(f"Site ID: {site_id}")
print(f" Site Name: {site.site_name}")
print(f" Custom Hostname: {site.custom_hostname}")
print(f" Bucket Name: {site.s3_bucket_name}")
print(f" Custom Domain: {site.s3_custom_domain}")
print()
else:
print(f"Site ID: {site_id} - NOT FOUND\n")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == "__main__":
check_sites_by_domains()

View File

@ -0,0 +1,50 @@
"""
Delete specific sites from database by ID
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.repositories import SiteDeploymentRepository
def delete_sites(site_ids):
"""Delete sites by their IDs"""
db_manager.initialize()
session = db_manager.get_session()
site_repo = SiteDeploymentRepository(session)
try:
deleted_count = 0
for site_id in site_ids:
# Get site info first to display what we're deleting
site = site_repo.get_by_id(site_id)
if site:
hostname = site.custom_hostname or site.pull_zone_bcdn_hostname
print(f"Deleting site {site_id}: {site.site_name} ({hostname})")
else:
print(f"Site {site_id}: Not found in database")
# Delete directly - the delete method handles not-found cases
if site_repo.delete(site_id):
print(f" [OK] Deleted site {site_id}")
deleted_count += 1
else:
print(f" [SKIP] Site {site_id} was not found or already deleted")
print(f"\nDeleted {deleted_count}/{len(site_ids)} site(s)")
except Exception as e:
print(f"Error deleting sites: {e}")
import traceback
traceback.print_exc()
session.rollback()
finally:
session.close()
if __name__ == "__main__":
site_ids = [568, 569, 570]
delete_sites(site_ids)

View File

@ -0,0 +1,46 @@
"""
Delete test sites from database
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.repositories import SiteDeploymentRepository
def delete_test_sites():
"""Delete test sites with IDs 398, 399, 400, 401"""
db_manager.initialize()
session = db_manager.get_session()
site_repo = SiteDeploymentRepository(session)
test_site_ids = [398, 399, 400, 401]
try:
for site_id in test_site_ids:
site = site_repo.get_by_id(site_id)
if site:
hostname = site.custom_hostname or site.pull_zone_bcdn_hostname
print(f"Deleting site {site_id}: {site.site_name} ({hostname})")
if site_repo.delete(site_id):
print(f" Deleted site {site_id}")
else:
print(f" Failed to delete site {site_id}")
else:
print(f" Site {site_id} not found, skipping")
print(f"\nDeleted {len(test_site_ids)} test sites")
except Exception as e:
print(f"Error deleting sites: {e}")
import traceback
traceback.print_exc()
session.rollback()
finally:
session.close()
if __name__ == "__main__":
delete_test_sites()

View File

@ -15,6 +15,10 @@ from datetime import datetime
import boto3
import click
from botocore.exceptions import ClientError, BotoCoreError, NoCredentialsError
from dotenv import load_dotenv
# Load .env file
load_dotenv()
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -126,6 +130,43 @@ def list_all_buckets(s3_client) -> List[BucketInfo]:
sys.exit(1)
def load_excluded_buckets(exclusion_file: str = "s3_bucket_exclusions.txt") -> set:
"""
Load excluded bucket names from a text file
Args:
exclusion_file: Path to exclusion file (relative to project root)
Returns:
Set of bucket names to exclude
"""
excluded = set()
exclusion_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
exclusion_file
)
if not os.path.exists(exclusion_path):
logger.debug(f"Exclusion file not found: {exclusion_path}, skipping exclusions")
return excluded
try:
with open(exclusion_path, 'r', encoding='utf-8') as f:
for line in f:
# Strip whitespace and skip comments/empty lines
line = line.strip()
if line and not line.startswith('#'):
excluded.add(line)
if excluded:
logger.info(f"Loaded {len(excluded)} excluded bucket(s) from {exclusion_file}")
except Exception as e:
logger.warning(f"Error loading exclusion file {exclusion_path}: {e}")
return excluded
def check_existing_deployments(site_repo: SiteDeploymentRepository, bucket_names: List[str]) -> Dict[str, bool]:
"""
Check which buckets are already registered in the database
@ -258,14 +299,18 @@ def register_bucket(
return False
def display_buckets(buckets: List[BucketInfo], existing_map: Dict[str, bool]):
def display_buckets(buckets: List[BucketInfo], existing_map: Dict[str, bool], excluded_buckets: set = None):
"""
Display buckets in a formatted table
Args:
buckets: List of BucketInfo objects
existing_map: Dictionary mapping bucket names to registration status
excluded_buckets: Set of excluded bucket names (optional)
"""
if excluded_buckets is None:
excluded_buckets = set()
click.echo("\n" + "=" * 80)
click.echo("Available S3 Buckets")
click.echo("=" * 80)
@ -274,13 +319,21 @@ def display_buckets(buckets: List[BucketInfo], existing_map: Dict[str, bool]):
for idx, bucket in enumerate(buckets, 1):
bucket.is_registered = existing_map.get(bucket.name, False)
status = "[REGISTERED]" if bucket.is_registered else "[AVAILABLE]"
if bucket.name in excluded_buckets:
status = "[EXCLUDED]"
elif bucket.is_registered:
status = "[REGISTERED]"
else:
status = "[AVAILABLE]"
click.echo(f"{idx:<4} {bucket.name:<40} {bucket.region:<15} {status:<15}")
click.echo("=" * 80)
def main():
@click.command()
@click.option('--auto-import-all', is_flag=True, default=False,
help='Automatically import all unregistered buckets as bucket-only sites (no custom domain)')
def main(auto_import_all: bool):
"""Main entry point for the discovery script"""
click.echo("S3 Bucket Discovery and Registration")
click.echo("=" * 80)
@ -308,26 +361,64 @@ def main():
click.echo("No S3 buckets found in your AWS account.")
return
# Load excluded buckets
excluded_buckets = load_excluded_buckets()
# Check which buckets are already registered
bucket_names = [b.name for b in buckets]
existing_map = check_existing_deployments(site_repo, bucket_names)
# Display buckets
display_buckets(buckets, existing_map)
# Mark excluded buckets in existing_map
for bucket_name in excluded_buckets:
if bucket_name in existing_map:
existing_map[bucket_name] = True # Treat excluded as "registered" to skip
# Filter out already registered buckets
available_buckets = [b for b in buckets if not existing_map.get(b.name, False)]
# Display buckets
display_buckets(buckets, existing_map, excluded_buckets)
# Filter out already registered buckets and excluded buckets
available_buckets = [
b for b in buckets
if not existing_map.get(b.name, False) and b.name not in excluded_buckets
]
if excluded_buckets:
excluded_count = sum(1 for b in buckets if b.name in excluded_buckets)
if excluded_count > 0:
click.echo(f"\nNote: {excluded_count} bucket(s) excluded by exclusion list")
if not available_buckets:
click.echo("\nAll buckets are already registered.")
return
# Prompt for bucket selection
# Auto-import mode: register all available buckets as bucket-only sites
if auto_import_all:
click.echo(f"\nAuto-import mode: Registering {len(available_buckets)} unregistered bucket(s) as bucket-only sites...")
success_count = 0
error_count = 0
for bucket_info in available_buckets:
# Register as bucket-only (no custom domain, site_name = bucket_name)
if register_bucket(bucket_info, site_repo, site_name=None, custom_domain=None):
success_count += 1
else:
error_count += 1
click.echo(f"\n{'=' * 80}")
click.echo(f"Auto-import complete: {success_count} bucket(s) registered, {error_count} failed.")
click.echo("=" * 80)
return
# Interactive mode: prompt for bucket selection
click.echo(f"\nFound {len(available_buckets)} available bucket(s) to register.")
click.echo("Enter bucket numbers to register (comma-separated, e.g., 1,3,5):")
click.echo("Or press Enter to skip registration.")
selection_input = click.prompt("Selection", default="", type=str).strip()
try:
selection_input = click.prompt("Selection", default="", type=str).strip()
except click.Abort:
click.echo("\nOperation cancelled.")
return
if not selection_input:
click.echo("No buckets selected. Exiting.")
@ -364,24 +455,35 @@ def main():
# Prompt for site name
default_site_name = bucket_info.name
site_name = click.prompt("Site name", default=default_site_name, type=str).strip()
try:
site_name = click.prompt("Site name", default=default_site_name, type=str).strip()
except click.Abort:
click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'")
continue
if not site_name:
site_name = default_site_name
# Prompt for custom domain (optional)
custom_domain = click.prompt(
"Custom domain (optional, press Enter to skip)",
default="",
type=str
).strip()
try:
custom_domain = click.prompt(
"Custom domain (optional, press Enter to skip)",
default="",
type=str
).strip()
except click.Abort:
click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'")
continue
if not custom_domain:
custom_domain = None
# Confirm registration
if click.confirm(f"Register '{bucket_info.name}' as '{site_name}'?"):
if register_bucket(bucket_info, site_repo, site_name, custom_domain):
success_count += 1
else:
try:
if click.confirm(f"Register '{bucket_info.name}' as '{site_name}'?"):
if register_bucket(bucket_info, site_repo, site_name, custom_domain):
success_count += 1
else:
click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'")
except click.Abort:
click.echo(f" [SKIP] Registration cancelled for '{bucket_info.name}'")
click.echo(f"\n{'=' * 80}")

View File

@ -0,0 +1,48 @@
"""
Clear site_deployment_id for articles assigned to deleted test sites
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.models import GeneratedContent
def fix_orphaned_articles():
"""Clear site_deployment_id for articles assigned to deleted test sites"""
db_manager.initialize()
session = db_manager.get_session()
deleted_site_ids = [398, 399, 400, 401]
try:
orphaned = session.query(GeneratedContent).filter(
GeneratedContent.site_deployment_id.in_(deleted_site_ids)
).all()
if orphaned:
print(f"Found {len(orphaned)} articles assigned to deleted test sites:")
for article in orphaned:
print(f" Article {article.id}: '{article.title}' -> site_deployment_id={article.site_deployment_id}")
article.site_deployment_id = None
session.add(article)
session.commit()
print(f"\nCleared site_deployment_id for {len(orphaned)} articles")
print("These articles can now be reassigned during post-processing or deployment")
else:
print("No orphaned articles found.")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
session.rollback()
finally:
session.close()
if __name__ == "__main__":
fix_orphaned_articles()

View File

@ -0,0 +1,83 @@
"""
Migration: Fix custom_hostname for existing S3 sites
Updates existing S3 site deployments that have s3_custom_domain set
but custom_hostname=None. This fixes sites registered before the bug fix
in discover_s3_buckets.py.
"""
import sys
import os
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.repositories import SiteDeploymentRepository
def migrate():
"""Update custom_hostname from s3_custom_domain for S3 sites"""
print("Fixing custom_hostname for existing S3 sites...")
try:
db_manager.initialize()
session = db_manager.get_session()
site_repo = SiteDeploymentRepository(session)
# Find all S3 sites with s3_custom_domain but no custom_hostname
all_sites = site_repo.get_all()
s3_sites_to_fix = [
site for site in all_sites
if site.storage_provider in ('s3', 's3_compatible')
and site.s3_custom_domain
and not site.custom_hostname
]
if not s3_sites_to_fix:
print("[OK] No S3 sites need fixing (all have custom_hostname set or no s3_custom_domain)")
return
print(f"\nFound {len(s3_sites_to_fix)} S3 site(s) to fix:")
for site in s3_sites_to_fix:
print(f" Site {site.id}: {site.site_name}")
print(f" s3_custom_domain: {site.s3_custom_domain}")
print(f" custom_hostname: {site.custom_hostname} (will be set to {site.s3_custom_domain})")
# Update each site
updated_count = 0
for site in s3_sites_to_fix:
try:
# Check if custom_hostname already exists (unique constraint)
existing = site_repo.get_by_hostname(site.s3_custom_domain)
if existing and existing.id != site.id:
print(f"\n[WARNING] Site {site.id} ({site.site_name})")
print(f" Cannot set custom_hostname='{site.s3_custom_domain}' - already used by site {existing.id}")
continue
# Update custom_hostname
site.custom_hostname = site.s3_custom_domain
session.add(site)
session.commit()
updated_count += 1
print(f"\n[OK] Updated site {site.id}: custom_hostname = '{site.s3_custom_domain}'")
except Exception as e:
session.rollback()
print(f"\n[ERROR] Failed to update site {site.id}: {e}")
raise
print(f"\n{'=' * 60}")
print(f"Migration complete: {updated_count}/{len(s3_sites_to_fix)} site(s) updated")
print("=" * 60)
except Exception as e:
print(f"\n[ERROR] Migration failed: {e}")
raise
finally:
if 'session' in locals():
session.close()
if __name__ == "__main__":
migrate()

View File

@ -0,0 +1,48 @@
"""
List all S3 sites that have custom domains (FQDNs) - these should be excluded from auto-import
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.models import SiteDeployment
def list_fqdn_s3_sites():
"""List S3 sites with custom domains"""
db_manager.initialize()
session = db_manager.get_session()
try:
sites = session.query(SiteDeployment).filter(
SiteDeployment.storage_provider.in_(['s3', 's3_compatible']),
SiteDeployment.s3_custom_domain.isnot(None)
).order_by(SiteDeployment.id).all()
print(f"\nFound {len(sites)} S3 sites with custom domains (FQDNs):\n")
print(f"{'ID':<5} {'Bucket Name':<40} {'Custom Domain':<40}")
print("-" * 90)
for site in sites:
bucket = site.s3_bucket_name or 'N/A'
domain = site.s3_custom_domain or 'N/A'
print(f"{site.id:<5} {bucket:<40} {domain:<40}")
print(f"\nThese buckets should be added to s3_bucket_exclusions.txt to prevent re-import")
print("\nBucket names to exclude:")
for site in sites:
if site.s3_bucket_name:
print(site.s3_bucket_name)
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == "__main__":
list_fqdn_s3_sites()

View File

@ -0,0 +1,57 @@
"""
List sites around specific IDs to see what exists
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.models import SiteDeployment
def list_sites_around_ids(target_ids):
"""List sites around the target IDs"""
db_manager.initialize()
session = db_manager.get_session()
try:
# Get the min and max IDs to check
min_id = min(target_ids) - 5
max_id = max(target_ids) + 5
sites = session.query(SiteDeployment).filter(
SiteDeployment.id >= min_id,
SiteDeployment.id <= max_id
).order_by(SiteDeployment.id).all()
print(f"\nSites with IDs between {min_id} and {max_id}:")
print(f"{'ID':<5} {'Site Name':<35} {'Hostname':<40} {'Storage':<10}")
print("-" * 100)
for site in sites:
hostname = site.custom_hostname or site.pull_zone_bcdn_hostname
print(f"{site.id:<5} {site.site_name:<35} {hostname:<40} {site.storage_provider:<10}")
print(f"\nTotal: {len(sites)} sites")
# Check specifically for the target IDs
print(f"\nChecking target IDs: {target_ids}")
for site_id in target_ids:
site = session.query(SiteDeployment).filter(SiteDeployment.id == site_id).first()
if site:
print(f" Site {site_id}: EXISTS - {site.site_name}")
else:
print(f" Site {site_id}: NOT FOUND")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == "__main__":
target_ids = [568, 569, 570]
list_sites_around_ids(target_ids)

View File

@ -0,0 +1,344 @@
"""
Match domains from registrar DNS exports to S3 buckets
Analyzes DNS records to find domains pointing to:
- Direct S3 website endpoints (*.s3-website-*.amazonaws.com)
- CloudFront distributions (which may front S3 buckets)
- S3 bucket names that match domain names
"""
import csv
import re
import sys
import os
from typing import Dict, List, Set, Optional, Tuple
from collections import defaultdict
import boto3
import click
from botocore.exceptions import ClientError, NoCredentialsError
from dotenv import load_dotenv
# Load .env file
load_dotenv()
def get_s3_buckets() -> List[Dict[str, str]]:
"""Get all S3 buckets with their regions"""
try:
s3_client = boto3.client('s3')
response = s3_client.list_buckets()
buckets = []
for bucket in response.get('Buckets', []):
bucket_name = bucket['Name']
# Get bucket region
try:
region_response = s3_client.get_bucket_location(Bucket=bucket_name)
region = region_response.get('LocationConstraint', 'us-east-1')
if region is None or region == '':
region = 'us-east-1'
except ClientError:
region = 'us-east-1'
buckets.append({
'name': bucket_name,
'region': region
})
return buckets
except NoCredentialsError:
click.echo("Warning: AWS credentials not found. Cannot list S3 buckets.", err=True)
click.echo("Will only match based on DNS records and name matching.", err=True)
return []
except Exception as e:
click.echo(f"Warning: Error listing buckets: {e}", err=True)
click.echo("Will only match based on DNS records and name matching.", err=True)
return []
def get_cloudfront_distributions() -> Dict[str, Dict]:
"""Get all CloudFront distributions and their origins"""
try:
cf_client = boto3.client('cloudfront')
distributions = {}
paginator = cf_client.get_paginator('list_distributions')
for page in paginator.paginate():
for dist in page.get('DistributionList', {}).get('Items', []):
dist_id = dist['Id']
domain_name = dist['DomainName']
# Get origin (S3 bucket if applicable)
origins = dist.get('Origins', {}).get('Items', [])
s3_origins = []
for origin in origins:
domain = origin.get('DomainName', '')
if '.s3.' in domain or 's3-website' in domain:
# Extract bucket name from S3 origin
bucket_match = re.search(r'([^/]+)\.s3[\.-]', domain)
if bucket_match:
s3_origins.append(bucket_match.group(1))
distributions[domain_name] = {
'id': dist_id,
's3_buckets': s3_origins,
'aliases': dist.get('Aliases', {}).get('Items', [])
}
return distributions
except Exception as e:
click.echo(f"Warning: Could not list CloudFront distributions: {e}", err=True)
return {}
def parse_dns_csv(csv_path: str) -> Dict[str, List[Dict]]:
"""
Parse DNS records CSV and extract domains with S3/CloudFront records
Returns:
Dictionary mapping domain -> list of relevant DNS records
"""
domains = defaultdict(list)
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
domain = row.get('Domain', '').strip()
record_type = row.get('Record Type', '').strip()
details = row.get('Details', '').strip()
record = row.get('Record', '').strip()
if not domain:
continue
# Check for S3 website endpoints
if 's3-website' in details.lower() or 's3.amazonaws.com' in details.lower():
domains[domain].append({
'type': 'S3_DIRECT',
'record_type': record_type,
'record': record,
'target': details,
'domain': domain
})
# Check for CloudFront
if 'cloudfront.net' in details.lower():
domains[domain].append({
'type': 'CLOUDFRONT',
'record_type': record_type,
'record': record,
'target': details,
'domain': domain
})
# Check for ACM validation (indicates AWS usage)
if 'acm-validations.aws' in details.lower():
domains[domain].append({
'type': 'ACM_VALIDATION',
'record_type': record_type,
'record': record,
'target': details,
'domain': domain
})
return dict(domains)
def extract_s3_bucket_from_target(target: str) -> Optional[str]:
"""Extract S3 bucket name from DNS target"""
# Pattern: bucket-name.s3-website-region.amazonaws.com
# Bucket name can contain dots, so capture everything up to .s3-website-
match = re.search(r'^([^/]+)\.s3-website-([^\.]+)\.amazonaws\.com', target)
if match:
return match.group(1)
# Pattern: www.bucket-name.s3-website-region.amazonaws.com
match = re.search(r'^www\.([^/]+)\.s3-website-([^\.]+)\.amazonaws\.com', target)
if match:
return match.group(1)
# Pattern: bucket-name.s3.region.amazonaws.com
match = re.search(r'^([^/]+)\.s3\.([^\.]+)\.amazonaws\.com', target)
if match:
return match.group(1)
return None
def match_domains_to_buckets(
domains: Dict[str, List[Dict]],
buckets: List[Dict[str, str]],
cloudfront: Dict[str, Dict]
) -> List[Dict]:
"""
Match domains to S3 buckets
Returns:
List of matches with domain, bucket, match_type, etc.
"""
matches = []
bucket_names = {b['name'] for b in buckets}
# Create CloudFront domain -> bucket mapping
cf_domain_to_bucket = {}
for cf_domain, cf_info in cloudfront.items():
for alias in cf_info.get('aliases', []):
cf_domain_to_bucket[alias.lower()] = cf_info['s3_buckets']
cf_domain_to_bucket[cf_domain.lower()] = cf_info['s3_buckets']
for domain, records in domains.items():
domain_lower = domain.lower()
domain_base = domain_lower.replace('www.', '').replace('@', '')
# Check each DNS record
for record in records:
match_info = {
'domain': domain,
'dns_record': record['record'],
'dns_target': record['target'],
'match_type': None,
'bucket_name': None,
'bucket_region': None,
'confidence': 'low'
}
if record['type'] == 'S3_DIRECT':
# Extract bucket from S3 endpoint
bucket_name = extract_s3_bucket_from_target(record['target'])
if bucket_name and bucket_name in bucket_names:
match_info['bucket_name'] = bucket_name
match_info['match_type'] = 'S3_DIRECT_DNS'
match_info['confidence'] = 'high'
# Find bucket region
for bucket in buckets:
if bucket['name'] == bucket_name:
match_info['bucket_region'] = bucket['region']
break
matches.append(match_info)
elif record['type'] == 'CLOUDFRONT':
# Check if CloudFront points to S3
cf_target = record['target'].lower()
if cf_target in cf_domain_to_bucket:
s3_buckets = cf_domain_to_bucket[cf_target]
if s3_buckets:
match_info['bucket_name'] = s3_buckets[0]
match_info['match_type'] = 'CLOUDFRONT_TO_S3'
match_info['confidence'] = 'high'
# Find bucket region
for bucket in buckets:
if bucket['name'] == match_info['bucket_name']:
match_info['bucket_region'] = bucket['region']
break
matches.append(match_info)
else:
# CloudFront but no S3 origin found
match_info['match_type'] = 'CLOUDFRONT_UNKNOWN_ORIGIN'
match_info['confidence'] = 'medium'
matches.append(match_info)
# Also check if domain name matches bucket name (name-based match)
if domain_base in bucket_names:
# Check if we already have a match for this domain
existing = [m for m in matches if m['domain'] == domain and m['bucket_name'] == domain_base]
if not existing:
match_info = {
'domain': domain,
'dns_record': 'N/A',
'dns_target': 'Name match',
'match_type': 'NAME_MATCH',
'bucket_name': domain_base,
'bucket_region': None,
'confidence': 'medium'
}
# Find bucket region
for bucket in buckets:
if bucket['name'] == domain_base:
match_info['bucket_region'] = bucket['region']
break
matches.append(match_info)
return matches
@click.command()
@click.argument('dns_csv', type=click.Path(exists=True))
@click.option('--output', '-o', type=click.Path(), help='Output CSV file for matches')
def main(dns_csv: str, output: Optional[str]):
"""Match domains from DNS CSV to S3 buckets"""
click.echo("Matching domains to S3 buckets...")
click.echo("=" * 80)
# Get S3 buckets
click.echo("\n1. Fetching S3 buckets...")
buckets = get_s3_buckets()
click.echo(f" Found {len(buckets)} S3 buckets")
# Get CloudFront distributions
click.echo("\n2. Fetching CloudFront distributions...")
cloudfront = get_cloudfront_distributions()
click.echo(f" Found {len(cloudfront)} CloudFront distributions")
# Parse DNS CSV
click.echo(f"\n3. Parsing DNS records from {dns_csv}...")
domains = parse_dns_csv(dns_csv)
click.echo(f" Found {len(domains)} domains with AWS-related DNS records")
# Match domains to buckets
click.echo("\n4. Matching domains to buckets...")
matches = match_domains_to_buckets(domains, buckets, cloudfront)
click.echo(f" Found {len(matches)} matches")
# Display results
click.echo("\n" + "=" * 80)
click.echo("MATCH RESULTS")
click.echo("=" * 80)
if not matches:
click.echo("No matches found.")
return
# Group by match type
by_type = defaultdict(list)
for match in matches:
by_type[match['match_type']].append(match)
for match_type, type_matches in by_type.items():
click.echo(f"\n{match_type} ({len(type_matches)} matches):")
click.echo("-" * 80)
for match in sorted(type_matches, key=lambda x: x['domain']):
confidence_icon = "[OK]" if match['confidence'] == 'high' else "[?]"
click.echo(f" {confidence_icon} {match['domain']:<40} -> {match['bucket_name'] or 'N/A':<40} "
f"({match['bucket_region'] or 'N/A'})")
# Save to CSV if requested
if output:
click.echo(f"\n5. Saving results to {output}...")
with open(output, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'domain', 'bucket_name', 'bucket_region', 'match_type',
'confidence', 'dns_record', 'dns_target'
])
writer.writeheader()
for match in matches:
writer.writerow(match)
click.echo(f" Saved {len(matches)} matches to {output}")
click.echo("\n" + "=" * 80)
click.echo("Summary:")
click.echo(f" Total domains analyzed: {len(domains)}")
click.echo(f" Total matches found: {len(matches)}")
click.echo(f" High confidence: {len([m for m in matches if m['confidence'] == 'high'])}")
click.echo(f" Medium confidence: {len([m for m in matches if m['confidence'] == 'medium'])}")
click.echo("=" * 80)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,66 @@
"""
Search for domains in various formats
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.models import SiteDeployment
def search_domains():
"""Search for domains"""
db_manager.initialize()
session = db_manager.get_session()
search_terms = [
"fractuslearning",
"rocktumbler",
"theteacher"
]
try:
print("\nSearching for sites containing these terms:\n")
for term in search_terms:
sites = session.query(SiteDeployment).filter(
(SiteDeployment.custom_hostname.contains(term)) |
(SiteDeployment.s3_custom_domain.contains(term)) |
(SiteDeployment.site_name.contains(term)) |
(SiteDeployment.s3_bucket_name.contains(term))
).all()
if sites:
print(f"Found {len(sites)} site(s) containing '{term}':")
for site in sites:
print(f" Site ID: {site.id}")
print(f" Site Name: {site.site_name}")
print(f" Custom Hostname: {site.custom_hostname}")
print(f" S3 Bucket Name: {site.s3_bucket_name}")
print(f" S3 Custom Domain: {site.s3_custom_domain}")
print()
else:
print(f"No sites found containing '{term}'\n")
# Also check all S3 sites with custom domains
print("\nAll S3 sites with custom domains:")
all_s3_fqdn = session.query(SiteDeployment).filter(
SiteDeployment.storage_provider.in_(['s3', 's3_compatible']),
SiteDeployment.s3_custom_domain.isnot(None)
).all()
for site in all_s3_fqdn:
print(f" ID {site.id}: {site.s3_bucket_name} -> {site.s3_custom_domain}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == "__main__":
search_domains()

View File

@ -0,0 +1,82 @@
"""
Test script to upload a dummy file to S3 bucket
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.repositories import SiteDeploymentRepository
from src.deployment.storage_factory import create_storage_client
def test_upload():
"""Upload a test file to airconditionerfixer.com"""
print("Testing S3 upload to airconditionerfixer.com...")
db_manager.initialize()
session = db_manager.get_session()
site_repo = SiteDeploymentRepository(session)
try:
# Get the site
site = site_repo.get_by_hostname("airconditionerfixer.com")
if not site:
print("ERROR: Site 'airconditionerfixer.com' not found")
return
print(f"\nSite found:")
print(f" ID: {site.id}")
print(f" Name: {site.site_name}")
print(f" Storage Provider: {site.storage_provider}")
print(f" S3 Bucket: {site.s3_bucket_name}")
print(f" S3 Region: {site.s3_bucket_region}")
print(f" Custom Domain: {site.s3_custom_domain}")
if site.storage_provider != 's3':
print(f"\nERROR: Site is not an S3 site (storage_provider={site.storage_provider})")
return
# Create storage client
client = create_storage_client(site)
print(f"\nStorage client created: {type(client).__name__}")
# Test content
test_content = "This is a test file uploaded to verify S3 bucket configuration and permissions."
test_file_path = "test-upload.txt"
print(f"\nUploading test file: {test_file_path}")
print(f"Content: {test_content[:50]}...")
# Upload
result = client.upload_file(
site=site,
file_path=test_file_path,
content=test_content
)
print(f"\nUpload Result:")
print(f" Success: {result.success}")
print(f" File Path: {result.file_path}")
print(f" Message: {result.message}")
if result.success:
# Generate URL
from src.generation.url_generator import generate_public_url
url = generate_public_url(site, test_file_path)
print(f"\nPublic URL: {url}")
print(f"\nTest file should be accessible at: {url}")
else:
print(f"\nERROR: Upload failed - {result.message}")
except Exception as e:
print(f"\nERROR: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == "__main__":
test_upload()

View File

@ -0,0 +1,163 @@
"""
Test script to upload article and image to localtris.com S3 bucket
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database.session import db_manager
from src.database.repositories import SiteDeploymentRepository
from src.deployment.storage_factory import create_storage_client
from src.generation.image_upload import upload_image_to_storage
from src.generation.url_generator import generate_public_url
def test_article_upload(site):
"""Test uploading an article (HTML content)"""
print("\n" + "="*60)
print("TEST 1: Article Upload (HTML)")
print("="*60)
client = create_storage_client(site)
print(f"Storage client: {type(client).__name__}")
# Test HTML content
test_html = """<!DOCTYPE html>
<html>
<head>
<title>Test Article - S3 Upload</title>
<meta charset="UTF-8">
</head>
<body>
<h1>Test Article Upload</h1>
<p>This is a test article uploaded to verify S3 article upload functionality.</p>
<p>If you can see this, the article upload is working correctly!</p>
</body>
</html>"""
test_file_path = "test-article-s3-upload.html"
print(f"\nUploading article: {test_file_path}")
print(f"Content length: {len(test_html)} characters")
result = client.upload_file(
site=site,
file_path=test_file_path,
content=test_html
)
print(f"\nUpload Result:")
print(f" Success: {result.success}")
print(f" File Path: {result.file_path}")
print(f" Message: {result.message}")
if result.success:
url = generate_public_url(site, test_file_path)
print(f"\nPublic URL: {url}")
print(f"\nArticle should be accessible at: {url}")
return True
else:
print(f"\nERROR: Upload failed - {result.message}")
return False
def test_image_upload(site):
"""Test uploading an image (binary data)"""
print("\n" + "="*60)
print("TEST 2: Image Upload (Binary)")
print("="*60)
# Create a simple test image (1x1 red PNG)
# PNG signature + minimal valid PNG structure
test_image_bytes = bytes([
0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, # PNG signature
0x00, 0x00, 0x00, 0x0D, # IHDR chunk length
0x49, 0x48, 0x44, 0x52, # IHDR
0x00, 0x00, 0x00, 0x01, # width = 1
0x00, 0x00, 0x00, 0x01, # height = 1
0x08, 0x02, 0x00, 0x00, 0x00, # bit depth, color type, etc.
0x90, 0x77, 0x53, 0xDE, # CRC
0x00, 0x00, 0x00, 0x0C, # IDAT chunk length
0x49, 0x44, 0x41, 0x54, # IDAT
0x78, 0x9C, 0x63, 0x00, 0x01, 0x00, 0x00, 0x05, 0x00, 0x01, # compressed data
0x0D, 0x0A, 0x2D, 0xB4, # CRC
0x00, 0x00, 0x00, 0x00, # IEND chunk length
0x49, 0x45, 0x4E, 0x44, # IEND
0xAE, 0x42, 0x60, 0x82 # CRC
])
test_file_path = "images/test-s3-upload.png"
print(f"\nUploading image: {test_file_path}")
print(f"Image size: {len(test_image_bytes)} bytes")
image_url = upload_image_to_storage(
site=site,
image_bytes=test_image_bytes,
file_path=test_file_path
)
if image_url:
print(f"\nImage Upload Result:")
print(f" Success: True")
print(f" Public URL: {image_url}")
print(f"\nImage should be accessible at: {image_url}")
return True
else:
print(f"\nERROR: Image upload failed")
return False
def main():
"""Run all tests for localtris.com"""
print("Testing S3 uploads to localtris.com...")
db_manager.initialize()
session = db_manager.get_session()
site_repo = SiteDeploymentRepository(session)
try:
# Get the site
site = site_repo.get_by_hostname("localtris.com")
if not site:
print("ERROR: Site 'localtris.com' not found")
return
print(f"\nSite found:")
print(f" ID: {site.id}")
print(f" Name: {site.site_name}")
print(f" Storage Provider: {site.storage_provider}")
print(f" S3 Bucket: {site.s3_bucket_name}")
print(f" S3 Region: {site.s3_bucket_region}")
print(f" Custom Domain: {site.s3_custom_domain}")
if site.storage_provider != 's3':
print(f"\nERROR: Site is not an S3 site (storage_provider={site.storage_provider})")
return
# Run tests
article_success = test_article_upload(site)
image_success = test_image_upload(site)
# Summary
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)
print(f"Article Upload: {'PASS' if article_success else 'FAIL'}")
print(f"Image Upload: {'PASS' if image_success else 'FAIL'}")
print("="*60)
if article_success and image_success:
print("\nAll tests passed!")
else:
print("\nSome tests failed. Check errors above.")
except Exception as e:
print(f"\nERROR: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == "__main__":
main()

View File

@ -5,7 +5,7 @@ Bunny.net Storage API client for uploading files to storage zones
import requests
import time
import logging
from typing import List, Optional, TYPE_CHECKING
from typing import List, Optional, TYPE_CHECKING, Union
from dataclasses import dataclass
if TYPE_CHECKING:
@ -70,7 +70,7 @@ class BunnyStorageClient:
self,
site: "SiteDeployment",
file_path: str,
content: str,
content: Union[str, bytes],
content_type: str = 'application/octet-stream'
) -> UploadResult:
"""
@ -79,7 +79,7 @@ class BunnyStorageClient:
Args:
site: SiteDeployment object with storage zone configuration
file_path: Path within storage zone (e.g., 'my-article.html')
content: File content to upload
content: File content to upload (str for text, bytes for binary like images)
content_type: MIME type (default: application/octet-stream per Bunny.net docs)
Returns:
@ -105,11 +105,17 @@ class BunnyStorageClient:
"accept": "application/json"
}
# Handle both string and bytes content
if isinstance(content, str):
body = content.encode('utf-8')
else:
body = content
for attempt in range(self.max_retries):
try:
response = self.session.put(
url,
data=content.encode('utf-8'),
data=body,
headers=headers,
timeout=60
)

View File

@ -85,10 +85,38 @@ class DeploymentService:
articles = self.content_repo.get_by_project_id(project_id)
logger.info(f"Found {len(articles)} articles to deploy for project {project_id}")
# Pre-deployment check: warn about articles missing formatted_html
articles_without_html = [a for a in articles if a.site_deployment_id and not a.formatted_html]
if articles_without_html:
logger.warning(
f"Found {len(articles_without_html)} articles without formatted_html. "
f"These likely failed template application and will be skipped: "
f"{[a.id for a in articles_without_html[:5]]}"
f"{'...' if len(articles_without_html) > 5 else ''}"
)
for article in articles:
if not article.site_deployment_id:
logger.warning(f"Article {article.id} has no site assigned, skipping")
continue
# Skip already-deployed articles
if article.deployed_at:
logger.debug(f"Article {article.id} already deployed at {article.deployed_at}, skipping")
continue
# Skip articles without formatted_html (likely template application failure)
if not article.formatted_html:
logger.warning(
f"Article {article.id} ('{article.title}') has no formatted_html, skipping deployment. "
f"This usually indicates template application failed during post-processing."
)
results['articles_failed'] += 1
results['errors'].append({
'type': 'article',
'id': article.id,
'title': article.title,
'error': 'Missing formatted_html (likely template application failure)'
})
continue
try:
site = self.site_repo.get_by_id(article.site_deployment_id)
@ -176,7 +204,11 @@ class DeploymentService:
BunnyStorageError: If upload fails
"""
if not article.formatted_html:
raise ValueError(f"Article {article.id} has no formatted_html to deploy")
raise ValueError(
f"Article {article.id} ('{article.title}') has no formatted_html to deploy. "
f"This usually indicates template application failed during post-processing. "
f"Check template application logs for article {article.id}."
)
file_path = generate_file_path(article)
url = generate_public_url(site, file_path)

View File

@ -7,7 +7,7 @@ import os
import time
import logging
import json
from typing import Optional, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING, Union
from dataclasses import dataclass
import boto3
@ -303,7 +303,7 @@ class S3StorageClient:
self,
site: "SiteDeployment",
file_path: str,
content: str
content: Union[str, bytes]
) -> UploadResult:
"""
Upload a file to S3 bucket
@ -311,7 +311,7 @@ class S3StorageClient:
Args:
site: SiteDeployment object with S3 configuration
file_path: Path within bucket (e.g., 'my-article.html')
content: File content to upload
content: File content to upload (str or bytes for binary files like images)
Returns:
UploadResult with success status and message
@ -336,16 +336,45 @@ class S3StorageClient:
s3_client = self._get_s3_client(region, endpoint_url)
# Handle both string and bytes content
if isinstance(content, str):
body = content.encode('utf-8')
else:
body = content
# Track which buckets don't support ACLs to avoid retrying
if not hasattr(self, '_buckets_no_acl'):
self._buckets_no_acl = set()
for attempt in range(self.max_retries):
try:
# Upload file with public-read ACL
s3_client.put_object(
Bucket=bucket_name,
Key=file_path,
Body=content.encode('utf-8'),
ContentType=content_type,
ACL='public-read'
)
# Prepare upload parameters
upload_kwargs = {
'Bucket': bucket_name,
'Key': file_path,
'Body': body,
'ContentType': content_type
}
# Only add ACL if bucket supports it
if bucket_name not in self._buckets_no_acl:
try:
upload_kwargs['ACL'] = 'public-read'
s3_client.put_object(**upload_kwargs)
except ClientError as acl_error:
acl_error_code = acl_error.response.get('Error', {}).get('Code', '')
if acl_error_code == 'AccessControlListNotSupported':
# Bucket doesn't support ACLs, retry without ACL
# Bucket policy should handle public access
self._buckets_no_acl.add(bucket_name)
logger.info(f"Bucket {bucket_name} does not support ACLs, using bucket policy for public access")
upload_kwargs.pop('ACL', None)
s3_client.put_object(**upload_kwargs)
else:
raise
else:
# Bucket known to not support ACLs, upload without ACL
s3_client.put_object(**upload_kwargs)
public_url = self._generate_public_url(
bucket_name, file_path, region, custom_domain

View File

@ -3,7 +3,7 @@ Storage client factory for multi-cloud storage support
Story 6.1: Storage Provider Abstraction Layer
"""
from typing import Protocol, TYPE_CHECKING
from typing import Protocol, TYPE_CHECKING, Union
from src.deployment.bunny_storage import BunnyStorageClient, UploadResult
from src.deployment.s3_storage import S3StorageClient
@ -18,7 +18,7 @@ class StorageClient(Protocol):
self,
site: "SiteDeployment",
file_path: str,
content: str
content: Union[str, bytes]
) -> UploadResult:
"""
Upload a file to storage
@ -26,7 +26,7 @@ class StorageClient(Protocol):
Args:
site: SiteDeployment object with storage configuration
file_path: Path within storage (e.g., 'my-article.html')
content: File content to upload
content: File content to upload (str for text, bytes for binary like images)
Returns:
UploadResult with success status and message

View File

@ -449,7 +449,6 @@ class BatchProcessor:
theme_override=theme_override
)
storage_client = BunnyStorageClient()
hero_url = None
content_image_urls = []
@ -469,7 +468,7 @@ class BatchProcessor:
if site:
main_keyword_slug = slugify(project.main_keyword)
file_path = f"images/{main_keyword_slug}.jpg"
hero_url = upload_image_to_storage(storage_client, site, hero_image, file_path)
hero_url = upload_image_to_storage(site, hero_image, file_path)
if hero_url:
click.echo(f"{prefix} Hero image uploaded: {hero_url}")
else:
@ -514,7 +513,7 @@ class BatchProcessor:
entity_slug = slugify(entity)
related_slug = slugify(related_search)
file_path = f"images/{main_keyword_slug}-{entity_slug}-{related_slug}.jpg"
img_url = upload_image_to_storage(storage_client, site, content_image, file_path)
img_url = upload_image_to_storage(site, content_image, file_path)
if img_url:
content_image_urls.append(img_url)
click.echo(f"{prefix} Content image {i+1}/{num_images} uploaded: {img_url}")
@ -566,7 +565,6 @@ class BatchProcessor:
theme_override=theme_override
)
storage_client = BunnyStorageClient()
hero_url = None
content_image_urls = []
@ -586,7 +584,7 @@ class BatchProcessor:
if site:
main_keyword_slug = slugify(project.main_keyword)
file_path = f"images/{main_keyword_slug}.jpg"
hero_url = upload_image_to_storage(storage_client, site, hero_image, file_path)
hero_url = upload_image_to_storage(site, hero_image, file_path)
if hero_url:
click.echo(f"{prefix} Hero image uploaded: {hero_url}")
else:
@ -631,7 +629,7 @@ class BatchProcessor:
entity_slug = slugify(entity)
related_slug = slugify(related_search)
file_path = f"images/{main_keyword_slug}-{entity_slug}-{related_slug}.jpg"
img_url = upload_image_to_storage(storage_client, site, content_image, file_path)
img_url = upload_image_to_storage(site, content_image, file_path)
if img_url:
content_image_urls.append(img_url)
click.echo(f"{prefix} Content image {i+1}/{num_images} uploaded: {img_url}")
@ -895,7 +893,6 @@ class BatchProcessor:
# Generate images (now with assigned site_deployment_id)
from src.generation.image_generator import ImageGenerator
from src.generation.image_upload import upload_image_to_storage
from src.deployment.bunny_storage import BunnyStorageClient
thread_image_generator = ImageGenerator(
ai_client=thread_generator.ai_client,
@ -910,7 +907,6 @@ class BatchProcessor:
if tier_config.image_config:
project = thread_project_repo.get_by_id(project_id)
if project:
storage_client = BunnyStorageClient()
from src.database.repositories import SiteDeploymentRepository
thread_site_repo = SiteDeploymentRepository(thread_session)
@ -930,7 +926,7 @@ class BatchProcessor:
if site:
main_keyword_slug = slugify(project.main_keyword)
file_path = f"images/{main_keyword_slug}.jpg"
hero_url = upload_image_to_storage(storage_client, site, hero_image, file_path)
hero_url = upload_image_to_storage(site, hero_image, file_path)
if hero_url:
click.echo(f"{prefix} Hero image uploaded: {hero_url}")
except Exception as e:
@ -971,7 +967,7 @@ class BatchProcessor:
entity_slug = slugify(entity)
related_slug = slugify(related_search)
file_path = f"images/{main_keyword_slug}-{entity_slug}-{related_slug}.jpg"
img_url = upload_image_to_storage(storage_client, site, content_image, file_path)
img_url = upload_image_to_storage(site, content_image, file_path)
if img_url:
content_image_urls.append(img_url)
click.echo(f"{prefix} Content image {i+1}/{num_images} uploaded: {img_url}")
@ -1118,17 +1114,36 @@ class BatchProcessor:
click.echo(f" Applying templates...")
url_map = {url_info["content_id"]: url_info["url"] for url_info in article_urls}
template_count = 0
template_failures = []
for content in content_records:
try:
canonical_url = url_map.get(content.id)
if self.generator.apply_template(content.id, canonical_url=canonical_url):
template_count += 1
else:
template_failures.append({
'id': content.id,
'title': content.title,
'error': 'Template application returned False'
})
except Exception as e:
template_failures.append({
'id': content.id,
'title': content.title,
'error': str(e)
})
click.echo(f" Warning: Failed to apply template to content {content.id}: {e}")
import traceback
click.echo(f" Traceback: {traceback.format_exc()}")
click.echo(f" Applied templates to {template_count}/{len(content_records)} articles")
if template_failures:
click.echo(f" Template failures: {len(template_failures)} articles")
for failure in template_failures[:5]: # Show first 5
click.echo(f" - Article {failure['id']} ('{failure['title']}'): {failure['error']}")
if len(template_failures) > 5:
click.echo(f" ... and {len(template_failures) - 5} more")
click.echo(f" Note: Articles without formatted_html will fail during deployment")
click.echo(f" {tier_name}: Post-processing complete")
def _deploy_job(self, project_id: int, continue_on_error: bool):

View File

@ -0,0 +1,169 @@
import os
import re
import random
import requests
from urllib.parse import quote
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def process_colinkri_urls(dripfeed=7):
"""
Process URL files and send them to Colinkri API.
Args:
dripfeed (int): Number of days for drip feed. Default is 7.
Returns:
dict: Summary of processed, successful, and failed files
"""
api_key = os.getenv('COLINKRI_API_KEY')
if not api_key:
raise ValueError("COLINKRI_API_KEY not found in environment variables")
# Setup directories
base_dir = Path('deployment_logs')
done_dir = base_dir / 'Done'
failed_dir = base_dir / 'Failed'
# Create directories if they don't exist
done_dir.mkdir(parents=True, exist_ok=True)
failed_dir.mkdir(parents=True, exist_ok=True)
# Pattern to match files: YYYY-MM-DD_other_tiers_urls.txt
pattern = re.compile(r'^\d{4}-\d{2}-\d{2}_other_tiers_urls\.txt$')
# Get matching files
matching_files = [f for f in base_dir.iterdir()
if f.is_file() and pattern.match(f.name)]
if not matching_files:
print("No matching files found.")
return {'processed': 0, 'successful': 0, 'failed': 0}
results = {'processed': 0, 'successful': 0, 'failed': 0}
for file_path in matching_files:
results['processed'] += 1
campaign_name = file_path.stem # Filename without .txt
print(f"\nProcessing: {file_path.name}")
try:
# Read URLs from file
with open(file_path, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip()]
if not urls:
print(f" ⚠️ No URLs found in {file_path.name}")
# Handle potential duplicate filenames in Failed folder
destination = failed_dir / file_path.name
counter = 1
while destination.exists():
new_name = f"{file_path.stem}_{counter}{file_path.suffix}"
destination = failed_dir / new_name
counter += 1
file_path.rename(destination)
results['failed'] += 1
continue
# Randomize URL order
random.shuffle(urls)
# Join URLs with pipe separator
urls_param = '|'.join(urls)
# Prepare API request
api_url = 'https://www.colinkri.com/amember/crawler/api'
# URL encode the parameters
data = {
'apikey': api_key,
'campaignname': campaign_name,
'dripfeed': str(dripfeed),
'urls': urls_param
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
# Send request
print(f" 📤 Sending {len(urls)} URLs to Colinkri API...")
response = requests.post(api_url, data=data, headers=headers, timeout=30)
# Check response
if response.status_code == 200:
print(f" ✅ Success! Campaign: {campaign_name}")
# Handle potential duplicate filenames in Done folder
destination = done_dir / file_path.name
counter = 1
while destination.exists():
# Add counter to filename if it already exists
new_name = f"{file_path.stem}_{counter}{file_path.suffix}"
destination = done_dir / new_name
counter += 1
file_path.rename(destination)
results['successful'] += 1
else:
error_msg = f"API returned status code {response.status_code}: {response.text}"
print(f" ❌ Failed: {error_msg}")
# Handle potential duplicate filenames in Failed folder
destination = failed_dir / file_path.name
counter = 1
while destination.exists():
new_name = f"{file_path.stem}_{counter}{file_path.suffix}"
destination = failed_dir / new_name
counter += 1
# Log error to file
error_log = failed_dir / f"{destination.stem}_error.log"
with open(error_log, 'w', encoding='utf-8') as f:
f.write(f"Error processing {file_path.name}\n")
f.write(f"Status Code: {response.status_code}\n")
f.write(f"Response: {response.text}\n")
file_path.rename(destination)
results['failed'] += 1
except Exception as e:
print(f" ❌ Error: {str(e)}")
# Handle potential duplicate filenames in Failed folder
destination = failed_dir / file_path.name
counter = 1
while destination.exists():
new_name = f"{file_path.stem}_{counter}{file_path.suffix}"
destination = failed_dir / new_name
counter += 1
# Log error to file
error_log = failed_dir / f"{destination.stem}_error.log"
with open(error_log, 'w', encoding='utf-8') as f:
f.write(f"Error processing {file_path.name}\n")
f.write(f"Exception: {str(e)}\n")
file_path.rename(destination)
results['failed'] += 1
# Print summary
print("\n" + "="*50)
print("SUMMARY")
print("="*50)
print(f"Files processed: {results['processed']}")
print(f"Successful: {results['successful']}")
print(f"Failed: {results['failed']}")
print("="*50)
return results
if __name__ == '__main__':
# Example usage
process_colinkri_urls(dripfeed=7)

View File

@ -3,9 +3,8 @@ Image upload utilities for storage zones
"""
import logging
import requests
from typing import Optional
from src.deployment.bunny_storage import BunnyStorageClient
from src.deployment.storage_factory import create_storage_client
from src.database.models import SiteDeployment
from src.generation.url_generator import generate_public_url
@ -13,50 +12,61 @@ logger = logging.getLogger(__name__)
def upload_image_to_storage(
storage_client: BunnyStorageClient,
site: SiteDeployment,
image_bytes: bytes,
file_path: str
) -> Optional[str]:
"""
Upload image to storage zone and return public URL
Upload image to storage and return public URL
Supports both Bunny.net and S3 storage providers.
Args:
storage_client: BunnyStorageClient instance
site: SiteDeployment with zone info
site: SiteDeployment with storage configuration
image_bytes: Image file bytes
file_path: Path within storage zone (e.g., 'images/hero.jpg')
file_path: Path within storage (e.g., 'images/hero.jpg')
Returns:
Public URL if successful, None if failed
"""
try:
# Check if file exists first
base_url = storage_client._get_storage_url(site.storage_zone_region)
check_url = f"{base_url}/{site.storage_zone_name}/{file_path}"
headers = {"AccessKey": site.storage_zone_password}
# Get appropriate storage client based on site provider
storage_client = create_storage_client(site)
check_response = requests.head(check_url, headers=headers, timeout=10)
if check_response.status_code == 200:
# File exists, return existing URL
logger.info(f"Image {file_path} already exists, using existing")
return generate_public_url(site, file_path)
# Determine content type from file extension
content_type = 'image/jpeg' # Default
if file_path.lower().endswith('.png'):
content_type = 'image/png'
elif file_path.lower().endswith('.gif'):
content_type = 'image/gif'
elif file_path.lower().endswith('.svg'):
content_type = 'image/svg+xml'
elif file_path.lower().endswith('.webp'):
content_type = 'image/webp'
# Upload image (binary data)
url = f"{base_url}/{site.storage_zone_name}/{file_path}"
headers = {
"AccessKey": site.storage_zone_password,
"Content-Type": "image/jpeg",
"accept": "application/json"
}
# Upload using storage client
# BunnyStorageClient accepts content_type parameter, S3StorageClient determines it from file_path
from src.deployment.bunny_storage import BunnyStorageClient
if isinstance(storage_client, BunnyStorageClient):
result = storage_client.upload_file(
site=site,
file_path=file_path,
content=image_bytes,
content_type=content_type
)
else:
# S3StorageClient determines content_type from file_path automatically
result = storage_client.upload_file(
site=site,
file_path=file_path,
content=image_bytes
)
response = requests.put(url, data=image_bytes, headers=headers, timeout=60)
if response.status_code in [200, 201]:
logger.info(f"Uploaded image {file_path} to {site.storage_zone_name}")
if result.success:
logger.info(f"Uploaded image {file_path} to {site.storage_zone_name or site.s3_bucket_name}")
return generate_public_url(site, file_path)
else:
logger.error(f"Failed to upload image {file_path}: {response.status_code} - {response.text}")
logger.error(f"Failed to upload image {file_path}: {result.message}")
return None
except Exception as e: