Big-Link-Man/docs/stories/story-2.7-concurrent-genera...

27 KiB
Raw Blame History

Story 2.7: Concurrent Article Generation & Word Count Optimization

Overview

Implement concurrent API calls for article generation using ThreadPoolExecutor to dramatically reduce batch processing time. Additionally, optimize word count targeting to reduce augmentation calls by compensating for the AI's consistent 150-200 word undershoot.

Status

PLANNED

Story Details

As a User, I want articles to be generated concurrently rather than sequentially, so that large batches complete in a fraction of the time without sacrificing quality or reliability.

Business Value

Current Performance (Sequential):

  • 100 articles × 3 API calls each = 300 sequential calls
  • ~4-6 seconds per call average
  • Total time: ~20-30 minutes for 100 articles
  • 100% augmentation rate due to word count undershoot

Expected Performance (Concurrent + Optimized):

  • 100 articles with 5 concurrent workers = 20 batches of 5
  • Same 4-6 seconds per call, but 5 calls in parallel
  • Total time: ~4-6 minutes for 100 articles (5x faster)
  • ~20% augmentation rate (80% reduction in augmentation API calls)
  • Combined speedup: ~10-15x for typical batches

Acceptance Criteria

1. Concurrent Article Generation

Status: PENDING

  • Article generation loop uses concurrent.futures.ThreadPoolExecutor
  • Default max_workers = 5 (configurable)
  • Each article (outline + content + optional augmentation) is a single unit of work
  • Main thread collects results as they complete
  • Progress reporting works correctly with concurrent execution
  • Post-processing (URL generation, interlinking, templating) remains sequential after all articles complete

2. Thread-Safe Database Operations

Status: PENDING

  • Each worker thread gets its own database session
  • No session sharing between threads
  • Stats updates use thread-safe locking mechanism
  • Content records committed properly from worker threads
  • No database connection pool exhaustion

3. Error Handling & Resilience

Status: PENDING

  • Individual article failures don't crash other workers
  • Failed articles tracked and reported in stats
  • continue_on_error flag respected across concurrent workers
  • Rate limit errors handled gracefully with existing retry logic
  • All exceptions from futures caught and logged

4. Configuration & Defaults

Status: PENDING

  • max_workers configurable via environment variable: CONCURRENT_WORKERS
  • Default value: 5 workers
  • Job config can override via optional max_workers field
  • Value of 1 means sequential processing (for debugging)
  • Console output shows concurrency setting at job start

5. Word Count Optimization

Status: PENDING

  • Content generation prompts compensate by adding 200 words to target
  • If min_word_count = 1000, prompt receives target_word_count = 1200
  • If max_word_count = 1500, prompt receives max_word_count = 1700
  • Augmentation threshold remains based on original min_word_count
  • No changes to validation or word counting logic

6. Progress Reporting

Status: PENDING

  • Console output remains clear and readable with concurrent execution
  • Article completion messages appear as workers finish (may be out of order)
  • Stats summary at end shows correct totals
  • Failed articles logged with full traceback
  • Execution time includes concurrent processing

Implementation Details

Architecture Changes

1. BatchProcessor Concurrency Integration

File: src/generation/batch_processor.py

New Import:

from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock

Modified Constructor:

class BatchProcessor:
    def __init__(
        self,
        content_generator: ContentGenerator,
        content_repo: GeneratedContentRepository,
        project_repo: ProjectRepository,
        site_deployment_repo: Optional[SiteDeploymentRepository] = None,
        max_workers: int = 5  # NEW PARAMETER
    ):
        self.generator = content_generator
        self.content_repo = content_repo
        self.project_repo = project_repo
        self.site_deployment_repo = site_deployment_repo
        self.max_workers = max_workers
        self.stats_lock = Lock()  # Thread-safe stats updates
        self.stats = {
            "total_jobs": 0,
            "processed_jobs": 0,
            "total_articles": 0,
            "generated_articles": 0,
            "augmented_articles": 0,
            "failed_articles": 0
        }

Modified Method: _process_tier()

def _process_tier(
    self, 
    project_id: int,
    tier_name: str,
    tier_config: TierConfig,
    resolved_targets: Dict[str, int],
    job: Job,
    debug: bool,
    continue_on_error: bool
):
    """Process all articles for a tier with concurrent generation"""
    click.echo(f"  {tier_name}: Generating {tier_config.count} articles (concurrency: {self.max_workers})")
    
    project = self.project_repo.get_by_id(project_id)
    keyword = project.main_keyword
    models = job.models if job.models else None
    
    # Generate all titles first (existing logic)
    click.echo(f"\n[{tier_name}] Generating {tier_config.count} titles in batches...")
    titles_file = self._generate_all_titles_for_tier(
        project_id, tier_name, tier_config, debug,
        model=models.title if models else None
    )
    
    with open(titles_file, 'r', encoding='utf-8') as f:
        titles = [line.strip() for line in f if line.strip()]
    
    click.echo(f"[{tier_name}] Generated {len(titles)} titles")
    
    targets_for_tier = resolved_targets if tier_name == "tier1" else {}
    
    # NEW: Prepare article generation tasks
    article_tasks = []
    for article_num in range(1, tier_config.count + 1):
        article_index = article_num - 1
        
        if article_index >= len(titles):
            click.echo(f"    Warning: Not enough titles generated, skipping article {article_num}")
            continue
        
        article_tasks.append({
            'project_id': project_id,
            'tier_name': tier_name,
            'tier_config': tier_config,
            'article_num': article_num,
            'article_index': article_index,
            'title': titles[article_index],
            'keyword': keyword,
            'resolved_targets': targets_for_tier,
            'debug': debug
        })
    
    # NEW: Execute article generation concurrently
    if self.max_workers > 1:
        self._process_articles_concurrent(article_tasks, continue_on_error)
    else:
        # Sequential fallback for debugging (max_workers=1)
        self._process_articles_sequential(article_tasks, continue_on_error)
    
    # Existing post-processing (sequential after all articles complete)
    try:
        self._post_process_tier(project_id, tier_name, job, debug)
    except Exception as e:
        click.echo(f"  Warning: Post-processing failed for {tier_name}: {e}")

New Method: _process_articles_concurrent()

def _process_articles_concurrent(
    self,
    article_tasks: List[Dict[str, Any]],
    continue_on_error: bool
):
    """
    Process articles concurrently using ThreadPoolExecutor
    
    Args:
        article_tasks: List of article task parameters
        continue_on_error: If True, continue on individual failures
    """
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        # Submit all tasks
        future_to_task = {
            executor.submit(self._generate_single_article_thread_safe, **task): task
            for task in article_tasks
        }
        
        # Process results as they complete
        for future in as_completed(future_to_task):
            task = future_to_task[future]
            article_num = task['article_num']
            tier_name = task['tier_name']
            
            try:
                future.result()  # Raises exception if article generation failed
                
                # Thread-safe stats update
                with self.stats_lock:
                    self.stats["generated_articles"] += 1
                    
            except Exception as e:
                with self.stats_lock:
                    self.stats["failed_articles"] += 1
                
                import traceback
                tier_config = task['tier_config']
                click.echo(f"    [{article_num}/{tier_config.count}] FAILED: {e}")
                click.echo(f"    Traceback: {traceback.format_exc()}")
                
                # Save error record to database
                try:
                    self.content_repo.create(
                        project_id=task['project_id'],
                        tier=tier_name,
                        keyword=task['keyword'],
                        title="Failed Generation",
                        outline={"error": str(e)},
                        content="",
                        word_count=0,
                        status="failed"
                    )
                except Exception as db_error:
                    click.echo(f"    Failed to save error record: {db_error}")
                
                if not continue_on_error:
                    # Cancel remaining futures
                    for f in future_to_task:
                        f.cancel()
                    raise

New Method: _process_articles_sequential()

def _process_articles_sequential(
    self,
    article_tasks: List[Dict[str, Any]],
    continue_on_error: bool
):
    """
    Process articles sequentially (fallback for max_workers=1)
    
    Args:
        article_tasks: List of article task parameters
        continue_on_error: If True, continue on individual failures
    """
    for task in article_tasks:
        with self.stats_lock:
            self.stats["total_articles"] += 1
        
        try:
            self._generate_single_article(**task)
            with self.stats_lock:
                self.stats["generated_articles"] += 1
                
        except Exception as e:
            with self.stats_lock:
                self.stats["failed_articles"] += 1
            
            import traceback
            click.echo(f"    [{task['article_num']}/{task['tier_config'].count}] FAILED: {e}")
            click.echo(f"    Traceback: {traceback.format_exc()}")
            
            if not continue_on_error:
                raise

New Method: _generate_single_article_thread_safe()

def _generate_single_article_thread_safe(
    self,
    project_id: int,
    tier_name: str,
    tier_config: TierConfig,
    article_num: int,
    article_index: int,
    title: str,
    keyword: str,
    resolved_targets: Dict[str, int],
    debug: bool
):
    """
    Thread-safe wrapper for _generate_single_article
    Creates a new database session for this thread
    
    All parameters same as _generate_single_article
    """
    # Increment total articles counter (thread-safe)
    with self.stats_lock:
        self.stats["total_articles"] += 1
    
    # Create thread-local database session
    from src.database.session import SessionLocal
    thread_session = SessionLocal()
    
    try:
        # Create thread-local repositories with new session
        thread_content_repo = GeneratedContentRepository(thread_session)
        thread_project_repo = ProjectRepository(thread_session)
        
        # Call existing generation logic with thread-local repos
        prefix = f"    [{article_num}/{tier_config.count}]"
        
        models = self.current_job.models if hasattr(self, 'current_job') and self.current_job.models else None
        
        site_deployment_id = assign_site_for_article(article_index, resolved_targets)
        
        if site_deployment_id:
            hostname = next((h for h, id in resolved_targets.items() if id == site_deployment_id), None)
            click.echo(f"{prefix} Assigned to site: {hostname} (ID: {site_deployment_id})")
        
        click.echo(f"{prefix} Using title: \"{title}\"")
        
        click.echo(f"{prefix} Generating outline...")
        outline = self.generator.generate_outline(
            project_id=project_id,
            title=title,
            min_h2=tier_config.min_h2_tags,
            max_h2=tier_config.max_h2_tags,
            min_h3=tier_config.min_h3_tags,
            max_h3=tier_config.max_h3_tags,
            debug=debug,
            model=models.outline if models else None
        )
        
        h2_count = len(outline["outline"])
        h3_count = sum(len(section.get("h3", [])) for section in outline["outline"])
        click.echo(f"{prefix} Generated outline: {h2_count} H2s, {h3_count} H3s")
        
        click.echo(f"{prefix} Generating content...")
        content = self.generator.generate_content(
            project_id=project_id,
            title=title,
            outline=outline,
            min_word_count=tier_config.min_word_count,
            max_word_count=tier_config.max_word_count,
            debug=debug,
            model=models.content if models else None
        )
        
        word_count = self.generator.count_words(content)
        click.echo(f"{prefix} Generated content: {word_count:,} words")
        
        status = "generated"
        
        # Check if augmentation needed (based on ORIGINAL min_word_count)
        if word_count < tier_config.min_word_count:
            click.echo(f"{prefix} Below minimum ({tier_config.min_word_count:,}), augmenting...")
            content = self.generator.augment_content(
                content=content,
                target_word_count=tier_config.min_word_count,
                debug=debug,
                project_id=project_id,
                model=models.content if models else None
            )
            word_count = self.generator.count_words(content)
            click.echo(f"{prefix} Augmented content: {word_count:,} words")
            status = "augmented"
            
            # Thread-safe stats update
            with self.stats_lock:
                self.stats["augmented_articles"] += 1
        
        # Save to database using thread-local session
        saved_content = thread_content_repo.create(
            project_id=project_id,
            tier=tier_name,
            keyword=keyword,
            title=title,
            outline=outline,
            content=content,
            word_count=word_count,
            status=status,
            site_deployment_id=site_deployment_id
        )
        
        thread_session.commit()
        click.echo(f"{prefix} Saved (ID: {saved_content.id}, Status: {status})")
        
    except Exception as e:
        thread_session.rollback()
        raise
        
    finally:
        thread_session.close()

2. Word Count Compensation in ContentGenerator

File: src/generation/service.py

Modified Method: generate_content()

def generate_content(
    self, 
    project_id: int, 
    title: str, 
    outline: dict,
    min_word_count: int,
    max_word_count: int,
    debug: bool = False,
    model: Optional[str] = None
) -> str:
    """
    Generate full article HTML fragment
    
    Compensates for AI undershoot by adding 200 words to targets.
    """
    project = self.project_repo.get_by_id(project_id)
    if not project:
        raise ValueError(f"Project {project_id} not found")
    
    entities_str = ", ".join(project.entities or [])
    related_str = ", ".join(project.related_searches or [])
    outline_str = json.dumps(outline, indent=2)
    
    # NEW: Add 200 word compensation for AI undershoot
    compensated_min = min_word_count + 200
    compensated_max = max_word_count + 200
    
    system_msg, user_prompt = self.prompt_manager.format_prompt(
        "content_generation",
        title=title,
        outline=outline_str,
        keyword=project.main_keyword,
        entities=entities_str,
        related_searches=related_str,
        min_word_count=compensated_min,  # MODIFIED
        max_word_count=compensated_max   # MODIFIED
    )
    
    content = self.ai_client.generate_completion(
        prompt=user_prompt,
        system_message=system_msg,
        max_tokens=8000,
        temperature=0.7,
        override_model=model
    )
    
    content = content.strip()
    content = self._clean_markdown_fences(content)
    
    if debug:
        self._save_debug_output(
            project_id, "content", content, "html"
        )
    
    return content

3. Configuration Management

File: src/core/config.py

New Configuration:

# Add to config loading
CONCURRENT_WORKERS = int(os.getenv("CONCURRENT_WORKERS", "5"))

File: env.example

# Concurrent processing
CONCURRENT_WORKERS=5  # Number of concurrent article generation workers (default: 5)

File: src/generation/job_config.py

Modified Job Schema:

class Job:
    def __init__(self, job_data: dict):
        # ... existing fields ...
        self.max_workers = job_data.get("max_workers", None)  # NEW: Optional override

4. CLI Integration

File: main.py or equivalent CLI entry point

Modified Initialization:

from src.core.config import CONCURRENT_WORKERS

# When creating BatchProcessor
batch_processor = BatchProcessor(
    content_generator=generator,
    content_repo=content_repo,
    project_repo=project_repo,
    site_deployment_repo=site_repo,
    max_workers=job.max_workers or CONCURRENT_WORKERS  # Use job override or env default
)

Console Output Example

Before (Sequential):

[tier2] Generating 10 articles...
    [1/10] Using title: "Guide to CNC Machining"
    [1/10] Generating outline...
    [1/10] Generated outline: 4 H2s, 8 H3s
    [1/10] Generating content...
    [1/10] Generated content: 850 words
    [1/10] Below minimum (1000), augmenting...
    [1/10] Augmented content: 1,050 words
    [1/10] Saved (ID: 45, Status: augmented)
    [2/10] Using title: "Advanced Machining Techniques"
    ...
Time: 8 minutes

After (Concurrent + Optimized):

[tier2] Generating 10 articles (concurrency: 5)...
    [1/10] Using title: "Guide to CNC Machining"
    [3/10] Using title: "Machining Best Practices"
    [2/10] Using title: "Advanced Machining Techniques"
    [1/10] Generating outline...
    [3/10] Generating outline...
    [2/10] Generating outline...
    [1/10] Generated outline: 4 H2s, 8 H3s
    [1/10] Generating content...
    [2/10] Generated outline: 5 H2s, 10 H3s
    [3/10] Generated outline: 4 H2s, 9 H3s
    [2/10] Generating content...
    [3/10] Generating content...
    [1/10] Generated content: 1,020 words  (NO AUGMENTATION!)
    [1/10] Saved (ID: 45, Status: generated)
    [4/10] Using title: "Precision Machining Guide"
    [2/10] Generated content: 1,150 words
    [2/10] Saved (ID: 46, Status: generated)
    ...
Time: 2 minutes

Error Scenarios

Scenario 1: Rate Limit Hit

[3/10] API Error: Rate limit exceeded, retrying in 2s...
[3/10] Retry successful
[3/10] Generated content: 1,020 words
  • Handled by existing retry logic in AIClient
  • No special concurrent handling needed

Scenario 2: Worker Thread Exception

[5/10] FAILED: Connection timeout
    Traceback: requests.exceptions.Timeout: ...
    Saved error record to database
[6/10] Generating outline...  (continues if continue_on_error=True)
  • Exception caught by future.result()
  • Logged with full traceback
  • Error record saved to database
  • Other workers continue if continue_on_error=True

Scenario 3: Database Connection Pool Exhaustion

  • Each thread creates its own session from SessionLocal()
  • Sessions closed in finally block
  • SQLAlchemy connection pool handles concurrency automatically
  • Default pool size (5) matches default max_workers (5)

Scenario 4: Concurrent Stats Corruption

  • All stats updates wrapped in with self.stats_lock:
  • Thread-safe increments guaranteed
  • Final summary reflects accurate counts

Testing Strategy

Unit Tests

File: tests/unit/test_concurrent_generation.py

New tests:

  • test_stats_lock_thread_safety() - Verify stats updates thread-safe
  • test_thread_safe_article_generation() - Mock concurrent execution
  • test_sequential_fallback_max_workers_1() - Verify sequential mode works
  • test_word_count_compensation() - Verify 200 word addition to prompts
  • test_future_exception_handling() - Exception from future caught correctly

Integration Tests

File: tests/integration/test_concurrent_batch_processing.py

New tests:

  • test_concurrent_tier_processing_10_articles() - Full tier with concurrency=5
  • test_concurrent_with_continue_on_error() - Partial failures don't stop batch
  • test_database_sessions_isolated() - No session conflicts between threads
  • test_stats_accuracy_concurrent() - Stats match actual records created
  • test_word_count_augmentation_reduction() - Verify augmentation rate drops

Performance Tests

File: tests/performance/test_batch_timing.py

New tests:

  • test_concurrent_speedup() - Measure 5x speedup for 25 articles
  • test_augmentation_rate_improvement() - Verify <20% augmentation rate
  • test_max_workers_scaling() - Test 1, 3, 5, 10 workers

Manual Testing

# Test default concurrency (5 workers)
CONCURRENT_WORKERS=5 python main.py generate-batch -j jobs/test_tier2_20articles.json -u admin -p password

# Test sequential mode
CONCURRENT_WORKERS=1 python main.py generate-batch -j jobs/test_tier2_20articles.json -u admin -p password

# Test high concurrency (stress test)
CONCURRENT_WORKERS=10 python main.py generate-batch -j jobs/test_tier3_100articles.json -u admin -p password

# Test with job-level override
# (Add "max_workers": 3 to job file)
python main.py generate-batch -j jobs/test_custom_concurrency.json -u admin -p password

Validation Checklist:

  • Articles generate in parallel (console output interleaved)
  • No database errors or session conflicts
  • Stats summary shows correct totals
  • Failed articles don't crash entire batch
  • Augmentation rate <30% (down from 100%)
  • Execution time ~5x faster for 25+ article batches
  • Post-processing completes after all articles finish

Design Decisions

Why ThreadPoolExecutor Instead of ProcessPoolExecutor?

  • API calls are I/O-bound (waiting on network), not CPU-bound
  • Threads more efficient for I/O operations
  • Easier session management (no pickling required)
  • Lower memory overhead
  • OpenAI SDK already thread-safe

Why Default max_workers=5?

  • Balances speed vs API rate limits
  • Matches typical SQLAlchemy connection pool size
  • Safe for most OpenRouter rate limits
  • ~5x speedup is significant without being aggressive
  • Users can increase if their rate limits allow

Why Not Parallelize Post-Processing?

  • Post-processing requires all articles to exist first
  • Interlinking depends on article relationships (order matters)
  • URL generation uses sequential logic
  • Complexity not worth marginal gains (~1% of total time)

Why Add 200 Words Instead of Percentage?

  • AI undershoot is consistent (~150-200 words) regardless of target
  • Simpler implementation (no floating point math)
  • Transparent to user (still validates against original min)
  • Can be tuned later based on production data

Why Thread-Local Sessions Instead of Shared Session?

  • SQLAlchemy sessions are NOT thread-safe
  • Session-per-thread is standard concurrency pattern
  • Avoids complex locking around database operations
  • Natural transaction boundaries (commit per article)

Known Limitations

  1. Console Output Ordering: Messages may appear out of order (article 3 before article 2)
  2. Rate Limiting: High concurrency may trigger rate limits on some plans
  3. Memory Usage: More workers = more simultaneous API calls = higher memory
  4. Connection Pool: Default pool size may need tuning for >5 workers
  5. Progress Bar: Linear progress bar would be misleading (not implemented)

Migration Notes

Breaking Changes:

  • None - existing code continues to work

Configuration Changes:

  • New environment variable: CONCURRENT_WORKERS
  • New optional job field: max_workers

Behavior Changes:

  • Console output may appear out of order
  • Execution time dramatically faster
  • Fewer augmentation API calls

Rollback Plan:

  • Set CONCURRENT_WORKERS=1 to revert to sequential processing
  • No code changes required

Files Created/Modified

New Files:

  • tests/unit/test_concurrent_generation.py - Unit tests
  • tests/integration/test_concurrent_batch_processing.py - Integration tests
  • tests/performance/test_batch_timing.py - Performance benchmarks
  • docs/stories/story-2.7-concurrent-generation-optimization.md - This document

Modified Files:

  • src/generation/batch_processor.py - Add concurrency logic
  • src/generation/service.py - Add word count compensation
  • src/core/config.py - Add CONCURRENT_WORKERS setting
  • src/generation/job_config.py - Add max_workers field
  • env.example - Document CONCURRENT_WORKERS variable
  • main.py - Pass max_workers to BatchProcessor

Performance Impact

Baseline (100 articles, sequential, current word count):

  • Title generation: ~30 seconds (batch mode, already optimized)
  • Article generation: ~20-30 minutes (sequential, 300 API calls)
  • Post-processing: ~15 seconds
  • Total: ~21-31 minutes

After Story 2.7 (100 articles, concurrent, optimized word count):

  • Title generation: ~30 seconds (unchanged)
  • Article generation: ~4-6 minutes (5x faster, 80% fewer augmentation calls)
  • Post-processing: ~15 seconds (unchanged)
  • Total: ~5-7 minutes

Improvement:

  • ~75-80% reduction in total batch time
  • ~80% reduction in augmentation API calls
  • ~80% reduction in API costs (fewer total API calls)

Dependencies

Required Stories:

  • Story 2.6: Batch title generation must be complete (titles generated upfront)
  • Story 2.3: Content generation service must exist
  • Story 1.6: Database models and repositories

Optional Dependencies:

  • None - works with any existing job configuration

Completion Checklist

  • Add ThreadPoolExecutor to BatchProcessor
  • Implement _process_articles_concurrent()
  • Implement _generate_single_article_thread_safe()
  • Add threading.Lock for stats updates
  • Implement sequential fallback (_process_articles_sequential)
  • Add word count compensation (+200) to generate_content()
  • Add CONCURRENT_WORKERS to config.py
  • Add max_workers to Job schema
  • Update env.example with documentation
  • Modify CLI to pass max_workers to BatchProcessor
  • Write unit tests (thread safety, word count compensation)
  • Write integration tests (concurrent execution, database isolation)
  • Write performance tests (timing, speedup measurement)
  • Manual testing with 1, 5, 10 workers
  • Verify augmentation rate drops to <30%
  • Verify no database session conflicts
  • Document configuration in README
  • Code review

Success Metrics

Primary:

  • Batch processing time reduced by >70% for 50+ article batches
  • No database errors or session conflicts in concurrent execution
  • Stats summary accurate (matches actual database records)

Secondary:

  • Augmentation rate <30% (down from ~100%)
  • Rate limit errors <5% of runs (existing retry logic handles them)
  • Console output remains readable and useful

Quality:

  • No regression in article quality
  • Word count targets met more consistently
  • Error handling maintains robustness

Notes

  • This is the single largest performance improvement in the project
  • Combines infrastructure improvement (concurrency) with algorithmic improvement (word count)
  • No quality trade-offs - articles remain identical quality
  • Opens door for future optimizations (outline + content parallelization)
  • Consider exposing concurrency setting in web UI (Story 1.4) in future