Big-Link-Man/src/generation/batch_processor.py

1225 lines
54 KiB
Python

"""
Batch processor for content generation jobs
"""
from typing import Dict, Any, Optional, List
import click
import os
import time
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from src.generation.service import ContentGenerator
from src.generation.job_config import JobConfig, Job, TierConfig
from src.generation.deployment_assignment import validate_and_resolve_targets, assign_site_for_article
from src.database.repositories import GeneratedContentRepository, ProjectRepository, SiteDeploymentRepository, ArticleLinkRepository, SitePageRepository
from src.generation.url_generator import generate_urls_for_batch
from src.interlinking.tiered_links import find_tiered_links
from src.interlinking.content_injection import inject_interlinks
from src.generation.site_assignment import assign_sites_to_batch, assign_site_to_single_article
from src.deployment.deployment_service import DeploymentService
from src.deployment.url_logger import URLLogger
from src.generation.image_generator import ImageGenerator
from src.generation.image_injection import insert_hero_after_h1, insert_content_images_after_h2s, generate_alt_text
from src.generation.image_upload import upload_image_to_storage
from src.generation.image_generator import slugify
import random
class BatchProcessor:
"""Processes batch content generation jobs"""
def __init__(
self,
content_generator: ContentGenerator,
content_repo: GeneratedContentRepository,
project_repo: ProjectRepository,
site_deployment_repo: Optional[SiteDeploymentRepository] = None,
max_workers: int = 5
):
self.generator = content_generator
self.content_repo = content_repo
self.project_repo = project_repo
self.site_deployment_repo = site_deployment_repo
self.max_workers = max_workers
self.stats_lock = Lock()
self.stats = {
"total_jobs": 0,
"processed_jobs": 0,
"total_articles": 0,
"generated_articles": 0,
"augmented_articles": 0,
"failed_articles": 0,
"articles_with_error": 0,
"tier1_time": 0.0,
"tier2_time": 0.0,
"tier3_time": 0.0,
"total_time": 0.0
}
def process_job(
self,
job_file_path: str,
debug: bool = False,
continue_on_error: bool = False,
auto_deploy: bool = True
):
"""
Process all jobs in job file
Args:
job_file_path: Path to job JSON file
debug: If True, save AI responses to debug_output/
continue_on_error: If True, continue on article generation failure
auto_deploy: If True, deploy to cloud storage after generation (default: True)
"""
start_time = time.time()
job_config = JobConfig(job_file_path)
jobs = job_config.get_jobs()
self.stats["total_jobs"] = len(jobs)
for job_idx, job in enumerate(jobs, 1):
try:
self._process_single_job(job, job_idx, debug, continue_on_error, auto_deploy)
self.stats["processed_jobs"] += 1
except Exception as e:
click.echo(f"Error processing job {job_idx}: {e}")
if not continue_on_error:
raise
self.stats["total_time"] = time.time() - start_time
self._print_summary()
def _generate_all_titles_for_tier(
self,
project_id: int,
tier_name: str,
tier_config: TierConfig,
debug: bool,
model: Optional[str] = None
) -> str:
"""
Generate all titles for a tier and save to file
Args:
project_id: Project ID
tier_name: Name of tier (e.g., "tier1")
tier_config: Tier configuration
debug: Debug mode flag
model: Optional model override for title generation
Returns:
Path to generated titles file
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
titles = self.generator.generate_titles_batch(
project_id=project_id,
count=tier_config.count,
batch_size=25,
debug=debug,
model=model
)
debug_dir = Path("debug_output")
debug_dir.mkdir(exist_ok=True)
filename = f"project_{project_id}_tier_{tier_name}_titles_{timestamp}.txt"
filepath = debug_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
for title in titles:
f.write(title + '\n')
click.echo(f"\n[{tier_name}] Title List:")
for i, title in enumerate(titles, 1):
click.echo(f" {i}. {title}")
click.echo()
return str(filepath)
def _process_single_job(
self,
job: Job,
job_idx: int,
debug: bool,
continue_on_error: bool,
auto_deploy: bool = True
):
"""Process a single job"""
self.current_job = job
project = self.project_repo.get_by_id(job.project_id)
if not project:
raise ValueError(f"Project {job.project_id} not found")
if not project.money_site_url:
raise ValueError(
f"Cannot generate articles: money_site_url not set for project {job.project_id}. "
f"Please set money_site_url in the project configuration. "
f"The money site is required for the tiered linking strategy."
)
click.echo(f"\nProcessing Job {job_idx}/{self.stats['total_jobs']}: Project ID {job.project_id}")
if job.models:
click.echo(f" Using per-stage models:")
click.echo(f" Title: {job.models.title}")
click.echo(f" Outline: {job.models.outline}")
click.echo(f" Content: {job.models.content}")
resolved_targets = {}
if job.deployment_targets:
if not self.site_deployment_repo:
raise ValueError("deployment_targets specified but SiteDeploymentRepository not provided")
click.echo(f" Validating deployment targets: {', '.join(job.deployment_targets)}")
try:
resolved_targets = validate_and_resolve_targets(
job.deployment_targets,
self.site_deployment_repo
)
click.echo(f" All deployment targets validated successfully")
except ValueError as e:
click.echo(f" Error: {e}", err=True)
raise
for tier_name, tier_config in job.tiers.items():
tier_start_time = time.time()
self._process_tier(
job.project_id,
tier_name,
tier_config,
resolved_targets,
job,
debug,
continue_on_error
)
tier_elapsed = time.time() - tier_start_time
with self.stats_lock:
self.stats[f"{tier_name}_time"] += tier_elapsed
if auto_deploy:
try:
self._deploy_job(job.project_id, continue_on_error)
except Exception as e:
click.echo(f" Warning: Auto-deployment failed: {e}")
if debug:
import traceback
click.echo(f" Traceback: {traceback.format_exc()}")
def _process_tier(
self,
project_id: int,
tier_name: str,
tier_config: TierConfig,
resolved_targets: Dict[str, int],
job: Job,
debug: bool,
continue_on_error: bool
):
"""Process all articles for a tier with concurrent generation"""
click.echo(f" {tier_name}: Generating {tier_config.count} articles (concurrency: {self.max_workers})")
project = self.project_repo.get_by_id(project_id)
keyword = project.main_keyword
models = tier_config.models if tier_config.models else (job.models if job.models else None)
click.echo(f"\n[{tier_name}] Generating {tier_config.count} titles in batches...")
titles_file = self._generate_all_titles_for_tier(
project_id,
tier_name,
tier_config,
debug,
model=models.title if models else None
)
with open(titles_file, 'r', encoding='utf-8') as f:
titles = [line.strip() for line in f if line.strip()]
click.echo(f"[{tier_name}] Generated {len(titles)} titles")
targets_for_tier = resolved_targets if tier_name == "tier1" else {}
article_tasks = []
for article_num in range(1, tier_config.count + 1):
article_index = article_num - 1
if article_index >= len(titles):
click.echo(f" Warning: Not enough titles generated, skipping article {article_num}")
continue
article_tasks.append({
'project_id': project_id,
'tier_name': tier_name,
'tier_config': tier_config,
'article_num': article_num,
'article_index': article_index,
'title': titles[article_index],
'keyword': keyword,
'resolved_targets': targets_for_tier,
'job': job,
'project_keyword': keyword,
'debug': debug,
'models': models
})
if self.max_workers > 1:
self._process_articles_concurrent(article_tasks, continue_on_error)
else:
self._process_articles_sequential(article_tasks, continue_on_error)
try:
self._post_process_tier(project_id, tier_name, job, debug)
except Exception as e:
click.echo(f" Warning: Post-processing failed for {tier_name}: {e}")
if debug:
import traceback
click.echo(f" Traceback: {traceback.format_exc()}")
def _generate_single_article(
self,
project_id: int,
tier_name: str,
tier_config: TierConfig,
article_num: int,
article_index: int,
title: str,
keyword: str,
resolved_targets: Dict[str, int],
job: Job,
project_keyword: str,
debug: bool,
models = None
):
"""Generate a single article with pre-generated title"""
prefix = f" [{article_num}/{tier_config.count}]"
site_deployment_id = assign_site_for_article(article_index, resolved_targets)
if site_deployment_id:
hostname = next((h for h, id in resolved_targets.items() if id == site_deployment_id), None)
click.echo(f"{prefix} Assigned to site: {hostname} (ID: {site_deployment_id})")
elif resolved_targets:
click.echo(f"{prefix} No site assignment (index {article_index} >= {len(resolved_targets)} targets)")
click.echo(f"{prefix} Using title: \"{title}\"")
click.echo(f"{prefix} Generating outline...")
outline = self.generator.generate_outline(
project_id=project_id,
title=title,
min_h2=tier_config.min_h2_tags,
max_h2=tier_config.max_h2_tags,
min_h3=tier_config.min_h3_tags,
max_h3=tier_config.max_h3_tags,
debug=debug,
model=models.outline if models else None
)
h2_count = len(outline["outline"])
h3_count = sum(len(section.get("h3", [])) for section in outline["outline"])
click.echo(f"{prefix} Generated outline: {h2_count} H2s, {h3_count} H3s")
click.echo(f"{prefix} Generating content...")
content, finish_reason = self.generator.generate_content(
project_id=project_id,
title=title,
outline=outline,
min_word_count=tier_config.min_word_count,
max_word_count=tier_config.max_word_count,
debug=debug,
model=models.content if models else None
)
if finish_reason != "stop":
self.stats["articles_with_error"] += 1
word_count = self.generator.count_words(content)
click.echo(f"{prefix} Generated content: {word_count:,} words")
status = "generated"
if word_count < tier_config.min_word_count:
click.echo(f"{prefix} Below minimum ({tier_config.min_word_count:,}), augmenting...")
content = self.generator.augment_content(
content=content,
target_word_count=tier_config.min_word_count,
debug=debug,
project_id=project_id,
model=models.content if models else None
)
word_count = self.generator.count_words(content)
click.echo(f"{prefix} Augmented content: {word_count:,} words")
status = "augmented"
self.stats["augmented_articles"] += 1
# Create minimal article record first so we can assign a site
saved_content = self.content_repo.create(
project_id=project_id,
tier=tier_name,
keyword=keyword,
title=title,
outline=outline,
content=content,
word_count=word_count,
status=status,
site_deployment_id=site_deployment_id,
hero_image_url=None,
content_images=None
)
# Assign site if not explicitly assigned
if not site_deployment_id and self.site_deployment_repo:
assigned_site = assign_site_to_single_article(
content=saved_content,
job=job,
site_repo=self.site_deployment_repo,
content_repo=self.content_repo,
project_keyword=project_keyword
)
if assigned_site:
site_deployment_id = assigned_site.id
# For S3 sites, prefer s3_custom_domain over pull_zone_bcdn_hostname
if assigned_site.storage_provider in ('s3', 's3_compatible') and assigned_site.s3_custom_domain:
hostname = assigned_site.s3_custom_domain
elif assigned_site.storage_provider in ('s3', 's3_compatible') and assigned_site.s3_bucket_name and assigned_site.s3_bucket_region:
# Use website endpoint format for standard AWS S3 (enables root URL access)
if assigned_site.storage_provider == 's3_compatible' or getattr(assigned_site, 's3_endpoint_url', None):
hostname = f"{assigned_site.s3_bucket_name}.s3.{assigned_site.s3_bucket_region}.amazonaws.com"
else:
hostname = f"{assigned_site.s3_bucket_name}.s3-website-{assigned_site.s3_bucket_region}.amazonaws.com"
else:
hostname = assigned_site.custom_hostname or assigned_site.pull_zone_bcdn_hostname
click.echo(f"{prefix} Assigned to site: {hostname} (ID: {site_deployment_id})")
# Update the article with the assigned site
saved_content.site_deployment_id = site_deployment_id
self.content_repo.session.add(saved_content)
self.content_repo.session.commit()
# Generate images (now with assigned site_deployment_id)
hero_url, content_image_urls = self._generate_images_only(
project_id=project_id,
tier_name=tier_name,
tier_config=tier_config,
title=title,
site_deployment_id=site_deployment_id,
prefix=prefix,
theme_override=job.image_theme_prompt
)
# Update article with image URLs
saved_content.hero_image_url = hero_url
saved_content.content_images = content_image_urls if content_image_urls else None
self.content_repo.session.add(saved_content)
self.content_repo.session.commit()
click.echo(f"{prefix} Saved (ID: {saved_content.id}, Status: {status})")
def _generate_and_insert_images(
self,
project_id: int,
tier_name: str,
tier_config: TierConfig,
title: str,
content: str,
site_deployment_id: Optional[int],
prefix: str,
theme_override: Optional[str] = None
) -> tuple[str, Optional[str], List[str]]:
"""
Generate images and insert into HTML content
WARNING: This method inserts images before interlink injection, which may cause
images to be lost during BeautifulSoup parsing. Consider using _generate_images_only()
and _reinsert_images() instead.
Note: image_config is always created by job config parser (with defaults if not in JSON).
Defaults: hero images for all tiers (1280x720), content images for T1 only (1-3 images).
"""
click.echo(f"{prefix} WARNING: DO YOU REALLY WANT TO GEN AND INSERT THE IMAGE? This may cause images to be lost during interlink injection!")
if not tier_config.image_config:
return content, None, []
project = self.project_repo.get_by_id(project_id)
if not project:
return content, None, []
# Initialize image generator
image_generator = ImageGenerator(
ai_client=self.generator.ai_client,
prompt_manager=self.generator.prompt_manager,
project_repo=self.project_repo,
theme_override=theme_override
)
hero_url = None
content_image_urls = []
# Generate hero image (all tiers if enabled)
if tier_config.image_config.hero:
try:
click.echo(f"{prefix} Generating hero image...")
hero_image = image_generator.generate_hero_image(
project_id=project_id,
title=title,
width=tier_config.image_config.hero.width,
height=tier_config.image_config.hero.height
)
if hero_image and site_deployment_id:
site = self.site_deployment_repo.get_by_id(site_deployment_id) if self.site_deployment_repo else None
if site:
main_keyword_slug = slugify(project.main_keyword)
file_path = f"images/{main_keyword_slug}.jpg"
hero_url = upload_image_to_storage(site, hero_image, file_path)
if hero_url:
click.echo(f"{prefix} Hero image uploaded: {hero_url}")
else:
click.echo(f"{prefix} Hero image upload failed")
except Exception as e:
click.echo(f"{prefix} Hero image generation failed: {e}")
# Generate content images (T1 only, if enabled)
if tier_config.image_config.content and tier_config.image_config.content.max_num_images > 0:
try:
num_images = random.randint(
tier_config.image_config.content.min_num_images,
tier_config.image_config.content.max_num_images
)
if num_images > 0:
click.echo(f"{prefix} Generating {num_images} content image(s)...")
entities = project.entities or []
related_searches = project.related_searches or []
if not entities or not related_searches:
click.echo(f"{prefix} Skipping content images (no entities/related_searches)")
else:
for i in range(num_images):
try:
entity = random.choice(entities)
related_search = random.choice(related_searches)
content_image = image_generator.generate_content_image(
project_id=project_id,
entity=entity,
related_search=related_search,
width=tier_config.image_config.content.width,
height=tier_config.image_config.content.height
)
if content_image and site_deployment_id:
site = self.site_deployment_repo.get_by_id(site_deployment_id) if self.site_deployment_repo else None
if site:
main_keyword_slug = slugify(project.main_keyword)
entity_slug = slugify(entity)
related_slug = slugify(related_search)
file_path = f"images/{main_keyword_slug}-{entity_slug}-{related_slug}.jpg"
img_url = upload_image_to_storage(site, content_image, file_path)
if img_url:
content_image_urls.append(img_url)
click.echo(f"{prefix} Content image {i+1}/{num_images} uploaded: {img_url}")
except Exception as e:
click.echo(f"{prefix} Content image {i+1} generation failed: {e}")
except Exception as e:
click.echo(f"{prefix} Content image generation failed: {e}")
# Insert images into HTML
if hero_url:
alt_text = generate_alt_text(project)
content = insert_hero_after_h1(content, hero_url, alt_text)
if content_image_urls:
alt_texts = [generate_alt_text(project) for _ in content_image_urls]
content = insert_content_images_after_h2s(content, content_image_urls, alt_texts)
return content, hero_url, content_image_urls
def _generate_images_only(
self,
project_id: int,
tier_name: str,
tier_config: TierConfig,
title: str,
site_deployment_id: Optional[int],
prefix: str,
theme_override: Optional[str] = None
) -> tuple[Optional[str], List[str]]:
"""
Generate images and upload to storage, but don't insert into HTML.
Returns (hero_url, content_image_urls) for later insertion.
Note: image_config is always created by job config parser (with defaults if not in JSON).
Defaults: hero images for all tiers (1280x720), content images for T1 only (1-3 images).
"""
if not tier_config.image_config:
return None, []
project = self.project_repo.get_by_id(project_id)
if not project:
return None, []
# Initialize image generator
image_generator = ImageGenerator(
ai_client=self.generator.ai_client,
prompt_manager=self.generator.prompt_manager,
project_repo=self.project_repo,
theme_override=theme_override
)
hero_url = None
content_image_urls = []
# Generate hero image (all tiers if enabled)
if tier_config.image_config.hero:
try:
click.echo(f"{prefix} Generating hero image...")
hero_image = image_generator.generate_hero_image(
project_id=project_id,
title=title,
width=tier_config.image_config.hero.width,
height=tier_config.image_config.hero.height
)
if hero_image and site_deployment_id:
site = self.site_deployment_repo.get_by_id(site_deployment_id) if self.site_deployment_repo else None
if site:
main_keyword_slug = slugify(project.main_keyword)
file_path = f"images/{main_keyword_slug}.jpg"
hero_url = upload_image_to_storage(site, hero_image, file_path)
if hero_url:
click.echo(f"{prefix} Hero image uploaded: {hero_url}")
else:
click.echo(f"{prefix} Hero image upload failed")
except Exception as e:
click.echo(f"{prefix} Hero image generation failed: {e}")
# Generate content images (T1 only, if enabled)
if tier_config.image_config.content and tier_config.image_config.content.max_num_images > 0:
try:
num_images = random.randint(
tier_config.image_config.content.min_num_images,
tier_config.image_config.content.max_num_images
)
if num_images > 0:
click.echo(f"{prefix} Generating {num_images} content image(s)...")
entities = project.entities or []
related_searches = project.related_searches or []
if not entities or not related_searches:
click.echo(f"{prefix} Skipping content images (no entities/related_searches)")
else:
for i in range(num_images):
try:
entity = random.choice(entities)
related_search = random.choice(related_searches)
content_image = image_generator.generate_content_image(
project_id=project_id,
entity=entity,
related_search=related_search,
width=tier_config.image_config.content.width,
height=tier_config.image_config.content.height
)
if content_image and site_deployment_id:
site = self.site_deployment_repo.get_by_id(site_deployment_id) if self.site_deployment_repo else None
if site:
main_keyword_slug = slugify(project.main_keyword)
entity_slug = slugify(entity)
related_slug = slugify(related_search)
file_path = f"images/{main_keyword_slug}-{entity_slug}-{related_slug}.jpg"
img_url = upload_image_to_storage(site, content_image, file_path)
if img_url:
content_image_urls.append(img_url)
click.echo(f"{prefix} Content image {i+1}/{num_images} uploaded: {img_url}")
except Exception as e:
click.echo(f"{prefix} Content image {i+1} generation failed: {e}")
except Exception as e:
click.echo(f"{prefix} Content image generation failed: {e}")
return hero_url, content_image_urls
def _reinsert_images(
self,
content_records: List,
project
) -> None:
"""Re-insert images into content after interlink injection"""
import re
for content in content_records:
if not content.hero_image_url and not content.content_images:
continue
html = content.content
# Remove existing images first (to avoid duplicates)
# Remove all img tags
html = re.sub(r'<img[^>]*>', '', html)
# Insert hero image if exists
if content.hero_image_url:
alt_text = generate_alt_text(project)
html = insert_hero_after_h1(html, content.hero_image_url, alt_text)
# Insert content images if exist
if content.content_images:
alt_texts = [generate_alt_text(project) for _ in content.content_images]
html = insert_content_images_after_h2s(html, content.content_images, alt_texts)
# Update content
content.content = html
self.content_repo.update(content)
def _process_articles_concurrent(
self,
article_tasks: List[Dict[str, Any]],
continue_on_error: bool
):
"""
Process articles concurrently using ThreadPoolExecutor
"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_task = {
executor.submit(self._generate_single_article_thread_safe, **task): task
for task in article_tasks
}
for future in as_completed(future_to_task):
task = future_to_task[future]
article_num = task['article_num']
tier_name = task['tier_name']
tier_config = task['tier_config']
try:
future.result()
with self.stats_lock:
self.stats["generated_articles"] += 1
except Exception as e:
with self.stats_lock:
self.stats["failed_articles"] += 1
import traceback
click.echo(f" [{article_num}/{tier_config.count}] FAILED: {e}")
click.echo(f" Traceback: {traceback.format_exc()}")
try:
self.content_repo.create(
project_id=task['project_id'],
tier=tier_name,
keyword=task['keyword'],
title="Failed Generation",
outline={"error": str(e)},
content="",
word_count=0,
status="failed"
)
except Exception as db_error:
click.echo(f" Failed to save error record: {db_error}")
if not continue_on_error:
for f in future_to_task:
f.cancel()
raise
def _process_articles_sequential(
self,
article_tasks: List[Dict[str, Any]],
continue_on_error: bool
):
"""
Process articles sequentially (fallback for max_workers=1)
"""
for task in article_tasks:
with self.stats_lock:
self.stats["total_articles"] += 1
try:
self._generate_single_article(**task)
with self.stats_lock:
self.stats["generated_articles"] += 1
except Exception as e:
with self.stats_lock:
self.stats["failed_articles"] += 1
import traceback
click.echo(f" [{task['article_num']}/{task['tier_config'].count}] FAILED: {e}")
click.echo(f" Traceback: {traceback.format_exc()}")
if not continue_on_error:
raise
def _generate_single_article_thread_safe(
self,
project_id: int,
tier_name: str,
tier_config: TierConfig,
article_num: int,
article_index: int,
title: str,
keyword: str,
resolved_targets: Dict[str, int],
job: Job,
project_keyword: str,
debug: bool,
models = None
):
"""
Thread-safe wrapper for article generation
Creates a new database session for this thread
"""
with self.stats_lock:
self.stats["total_articles"] += 1
from src.database.session import db_manager
from src.generation.service import ContentGenerator
thread_session = db_manager.get_session()
try:
thread_content_repo = GeneratedContentRepository(thread_session)
thread_project_repo = ProjectRepository(thread_session)
thread_generator = ContentGenerator(
ai_client=self.generator.ai_client,
prompt_manager=self.generator.prompt_manager,
project_repo=thread_project_repo,
content_repo=thread_content_repo,
template_service=self.generator.template_service,
site_deployment_repo=self.generator.site_deployment_repo
)
prefix = f" [{article_num}/{tier_config.count}]"
site_deployment_id = assign_site_for_article(article_index, resolved_targets)
if site_deployment_id:
hostname = next((h for h, id in resolved_targets.items() if id == site_deployment_id), None)
click.echo(f"{prefix} Assigned to site: {hostname} (ID: {site_deployment_id})")
click.echo(f"{prefix} Using title: \"{title}\"")
click.echo(f"{prefix} Generating outline...")
outline = thread_generator.generate_outline(
project_id=project_id,
title=title,
min_h2=tier_config.min_h2_tags,
max_h2=tier_config.max_h2_tags,
min_h3=tier_config.min_h3_tags,
max_h3=tier_config.max_h3_tags,
debug=debug,
model=models.outline if models else None
)
h2_count = len(outline["outline"])
h3_count = sum(len(section.get("h3", [])) for section in outline["outline"])
click.echo(f"{prefix} Generated outline: {h2_count} H2s, {h3_count} H3s")
click.echo(f"{prefix} Generating content...")
content, finish_reason = thread_generator.generate_content(
project_id=project_id,
title=title,
outline=outline,
min_word_count=tier_config.min_word_count,
max_word_count=tier_config.max_word_count,
debug=debug,
model=models.content if models else None
)
if finish_reason != "stop":
with self.stats_lock:
self.stats["articles_with_error"] += 1
word_count = thread_generator.count_words(content)
click.echo(f"{prefix} Generated content: {word_count:,} words")
status = "generated"
if word_count < tier_config.min_word_count:
click.echo(f"{prefix} Below minimum ({tier_config.min_word_count:,}), augmenting...")
content = thread_generator.augment_content(
content=content,
target_word_count=tier_config.min_word_count,
debug=debug,
project_id=project_id,
model=models.content if models else None
)
word_count = thread_generator.count_words(content)
click.echo(f"{prefix} Augmented content: {word_count:,} words")
status = "augmented"
with self.stats_lock:
self.stats["augmented_articles"] += 1
# Create article first so we can assign a site
saved_content = thread_content_repo.create(
project_id=project_id,
tier=tier_name,
keyword=keyword,
title=title,
outline=outline,
content=content,
word_count=word_count,
status=status,
site_deployment_id=site_deployment_id,
hero_image_url=None,
content_images=None
)
# Assign site if not explicitly assigned
if not site_deployment_id:
from src.database.repositories import SiteDeploymentRepository
thread_site_repo = SiteDeploymentRepository(thread_session)
assigned_site = assign_site_to_single_article(
content=saved_content,
job=job,
site_repo=thread_site_repo,
content_repo=thread_content_repo,
project_keyword=project_keyword
)
if assigned_site:
site_deployment_id = assigned_site.id
# For S3 sites, prefer s3_custom_domain over pull_zone_bcdn_hostname
if assigned_site.storage_provider in ('s3', 's3_compatible') and assigned_site.s3_custom_domain:
hostname = assigned_site.s3_custom_domain
elif assigned_site.storage_provider in ('s3', 's3_compatible') and assigned_site.s3_bucket_name and assigned_site.s3_bucket_region:
# Use website endpoint format for standard AWS S3 (enables root URL access)
if assigned_site.storage_provider == 's3_compatible' or getattr(assigned_site, 's3_endpoint_url', None):
hostname = f"{assigned_site.s3_bucket_name}.s3.{assigned_site.s3_bucket_region}.amazonaws.com"
else:
hostname = f"{assigned_site.s3_bucket_name}.s3-website-{assigned_site.s3_bucket_region}.amazonaws.com"
else:
hostname = assigned_site.custom_hostname or assigned_site.pull_zone_bcdn_hostname
click.echo(f"{prefix} Assigned to site: {hostname} (ID: {site_deployment_id})")
# Update the article with the assigned site
saved_content.site_deployment_id = site_deployment_id
thread_session.add(saved_content)
thread_session.commit()
# Generate images (now with assigned site_deployment_id)
from src.generation.image_generator import ImageGenerator
from src.generation.image_upload import upload_image_to_storage
thread_image_generator = ImageGenerator(
ai_client=thread_generator.ai_client,
prompt_manager=thread_generator.prompt_manager,
project_repo=thread_project_repo,
theme_override=job.image_theme_prompt
)
hero_url = None
content_image_urls = []
if tier_config.image_config:
project = thread_project_repo.get_by_id(project_id)
if project:
from src.database.repositories import SiteDeploymentRepository
thread_site_repo = SiteDeploymentRepository(thread_session)
# Generate hero image
if tier_config.image_config.hero:
try:
click.echo(f"{prefix} Generating hero image...")
hero_image = thread_image_generator.generate_hero_image(
project_id=project_id,
title=title,
width=tier_config.image_config.hero.width,
height=tier_config.image_config.hero.height
)
if hero_image and site_deployment_id:
site = thread_site_repo.get_by_id(site_deployment_id)
if site:
main_keyword_slug = slugify(project.main_keyword)
file_path = f"images/{main_keyword_slug}.jpg"
hero_url = upload_image_to_storage(site, hero_image, file_path)
if hero_url:
click.echo(f"{prefix} Hero image uploaded: {hero_url}")
except Exception as e:
click.echo(f"{prefix} Hero image generation failed: {e}")
# Generate content images
if tier_config.image_config.content and tier_config.image_config.content.max_num_images > 0:
try:
num_images = random.randint(
tier_config.image_config.content.min_num_images,
tier_config.image_config.content.max_num_images
)
if num_images > 0:
click.echo(f"{prefix} Generating {num_images} content image(s)...")
entities = project.entities or []
related_searches = project.related_searches or []
if entities and related_searches:
for i in range(num_images):
try:
entity = random.choice(entities)
related_search = random.choice(related_searches)
content_image = thread_image_generator.generate_content_image(
project_id=project_id,
entity=entity,
related_search=related_search,
width=tier_config.image_config.content.width,
height=tier_config.image_config.content.height
)
if content_image and site_deployment_id:
site = thread_site_repo.get_by_id(site_deployment_id)
if site:
main_keyword_slug = slugify(project.main_keyword)
entity_slug = slugify(entity)
related_slug = slugify(related_search)
file_path = f"images/{main_keyword_slug}-{entity_slug}-{related_slug}.jpg"
img_url = upload_image_to_storage(site, content_image, file_path)
if img_url:
content_image_urls.append(img_url)
click.echo(f"{prefix} Content image {i+1}/{num_images} uploaded: {img_url}")
except Exception as e:
click.echo(f"{prefix} Content image {i+1} generation failed: {e}")
except Exception as e:
click.echo(f"{prefix} Content image generation failed: {e}")
# Update article with image URLs
saved_content.hero_image_url = hero_url
saved_content.content_images = content_image_urls if content_image_urls else None
thread_session.add(saved_content)
thread_session.commit()
click.echo(f"{prefix} Saved (ID: {saved_content.id}, Status: {status})")
except Exception as e:
thread_session.rollback()
raise
finally:
thread_session.close()
def _post_process_tier(
self,
project_id: int,
tier_name: str,
job: Job,
debug: bool
):
"""
Post-process articles after generation: site assignment, URL generation, interlinking, templating
Args:
project_id: Project ID
tier_name: Tier name (tier1, tier2, tier3)
job: Job configuration
debug: Debug mode flag
"""
if not self.site_deployment_repo:
click.echo(f" {tier_name}: Skipping post-processing (no site deployment repo)")
return
project = self.project_repo.get_by_id(project_id)
# Step 0: Site assignment for articles without sites (Story 3.1)
# Get ALL articles for this tier (including those without sites)
all_articles = self.content_repo.get_by_project_and_tier(
project_id, tier_name, require_site=False
)
if not all_articles:
click.echo(f" {tier_name}: No articles to post-process")
return
# Find articles without site assignments
articles_without_sites = [a for a in all_articles if not a.site_deployment_id]
if articles_without_sites:
click.echo(f" {tier_name}: Assigning sites to {len(articles_without_sites)} articles...")
try:
# Note: Pass ALL articles so function knows which sites are already used
# The function will only assign sites to articles without site_deployment_id
# bunny_client=None means auto_create_sites won't work, but pool assignment works
assign_sites_to_batch(
content_records=all_articles, # Pass ALL, not just those without sites
job=job,
site_repo=self.site_deployment_repo,
bunny_client=None, # Not available in BatchProcessor
project_keyword=project.main_keyword
)
click.echo(f" Assigned {len(articles_without_sites)} articles to sites")
# Refresh article objects to get updated site_deployment_id
self.content_repo.session.expire_all()
all_articles = self.content_repo.get_by_project_and_tier(
project_id, tier_name, require_site=False
)
except ValueError as e:
click.echo(f" Warning: Site assignment failed: {e}")
if "auto_create_sites" in str(e):
click.echo(f" Tip: Set auto_create_sites in job config or ensure sufficient sites exist")
# Get articles that now have site assignments
content_records = [a for a in all_articles if a.site_deployment_id]
if not content_records:
click.echo(f" {tier_name}: No articles with site assignments to post-process")
return
# Skip articles already post-processed (idempotency check)
unprocessed = [a for a in content_records if not a.formatted_html]
if not unprocessed:
click.echo(f" {tier_name}: All {len(content_records)} articles already post-processed, skipping")
return
if len(unprocessed) < len(content_records):
click.echo(f" {tier_name}: Skipping {len(content_records) - len(unprocessed)} already processed articles")
content_records = unprocessed
click.echo(f" {tier_name}: Post-processing {len(content_records)} articles...")
# Step 1: Generate URLs (Story 3.1)
click.echo(f" Generating URLs...")
article_urls = generate_urls_for_batch(content_records, self.site_deployment_repo)
click.echo(f" Generated {len(article_urls)} URLs")
# Step 2: Find tiered links (Story 3.2)
click.echo(f" Finding tiered links...")
tiered_links = find_tiered_links(
content_records,
job,
self.project_repo,
self.content_repo,
self.site_deployment_repo
)
click.echo(f" Found tiered links for tier {tiered_links.get('tier', 'N/A')}")
# Step 3: Inject interlinks (Story 3.3)
click.echo(f" Injecting interlinks...")
link_repo = ArticleLinkRepository(self.content_repo.session)
inject_interlinks(
content_records,
article_urls,
tiered_links,
project,
job,
self.content_repo,
link_repo
)
click.echo(f" Interlinks injected successfully")
# Step 3.5: Re-insert images after interlink injection
click.echo(f" Re-inserting images...")
self._reinsert_images(content_records, project)
click.echo(f" Images re-inserted successfully")
# Refresh content records to ensure we have latest content with images
self.content_repo.session.expire_all()
for content in content_records:
self.content_repo.session.refresh(content)
# Step 4: Apply templates
click.echo(f" Applying templates...")
url_map = {url_info["content_id"]: url_info["url"] for url_info in article_urls}
template_count = 0
template_failures = []
for content in content_records:
try:
canonical_url = url_map.get(content.id)
if self.generator.apply_template(content.id, canonical_url=canonical_url):
template_count += 1
else:
template_failures.append({
'id': content.id,
'title': content.title,
'error': 'Template application returned False'
})
except Exception as e:
template_failures.append({
'id': content.id,
'title': content.title,
'error': str(e)
})
click.echo(f" Warning: Failed to apply template to content {content.id}: {e}")
import traceback
click.echo(f" Traceback: {traceback.format_exc()}")
click.echo(f" Applied templates to {template_count}/{len(content_records)} articles")
if template_failures:
click.echo(f" Template failures: {len(template_failures)} articles")
for failure in template_failures[:5]: # Show first 5
click.echo(f" - Article {failure['id']} ('{failure['title']}'): {failure['error']}")
if len(template_failures) > 5:
click.echo(f" ... and {len(template_failures) - 5} more")
click.echo(f" Note: Articles without formatted_html will fail during deployment")
click.echo(f" {tier_name}: Post-processing complete")
def _deploy_job(self, project_id: int, continue_on_error: bool):
"""
Deploy all content for a project to cloud storage
Args:
project_id: Project ID to deploy
continue_on_error: If True, continue on individual file failures
Note:
Uses per-zone storage_zone_password from database for authentication.
No API key from .env is required for uploads.
"""
click.echo(f"\n Deployment: Starting automatic deployment for project {project_id}...")
url_logger = URLLogger()
page_repo = SitePageRepository(self.content_repo.session)
deployment_service = DeploymentService(
content_repo=self.content_repo,
site_repo=self.site_deployment_repo,
page_repo=page_repo,
url_logger=url_logger
)
results = deployment_service.deploy_batch(
project_id=project_id,
continue_on_error=continue_on_error
)
click.echo(f" Deployment: {results['articles_deployed']} articles, {results['pages_deployed']} pages deployed")
if results['articles_failed'] > 0 or results['pages_failed'] > 0:
click.echo(f" Deployment: {results['articles_failed']} article failures, {results['pages_failed']} page failures")
click.echo(f" Deployment: Complete in {results['total_time']:.1f}s")
def _print_summary(self):
"""Print job processing summary"""
click.echo("\n" + "="*60)
click.echo("SUMMARY")
click.echo("="*60)
click.echo(f"Jobs processed: {self.stats['processed_jobs']}/{self.stats['total_jobs']}")
click.echo(f"Articles generated: {self.stats['generated_articles']}/{self.stats['total_articles']}")
click.echo(f"Augmented: {self.stats['augmented_articles']}")
click.echo(f"Failed: {self.stats['failed_articles']}")
click.echo(f"Articles With Error From OpenRouter: {self.stats['articles_with_error']}")
click.echo("")
click.echo("TIMING")
click.echo("-" * 60)
if self.stats['tier1_time'] > 0:
click.echo(f"Tier 1 Time: {self.stats['tier1_time']:.1f}s ({self.stats['tier1_time']/60:.1f}m)")
if self.stats['tier2_time'] > 0:
click.echo(f"Tier 2 Time: {self.stats['tier2_time']:.1f}s ({self.stats['tier2_time']/60:.1f}m)")
if self.stats['tier3_time'] > 0:
click.echo(f"Tier 3 Time: {self.stats['tier3_time']:.1f}s ({self.stats['tier3_time']/60:.1f}m)")
click.echo(f"Total Time: {self.stats['total_time']:.1f}s ({self.stats['total_time']/60:.1f}m)")
click.echo("="*60)