Initial commit: link building workflow extracted from CheddahBot
Standalone package wrapping Big-Link-Man (BLM) for Paperclip. Extracted from cheddahbot/tools/linkbuilding.py and related modules, with task-system coupling, folder watching, and AutoCora queue logic stripped out. Public API: - Deps, BLMConfig, LLMCheck (injection types) - normalize_for_match, fuzzy_keyword_match, filename_stem_to_keyword - list_inbox_xlsx, find_xlsx_for_keyword, find_all_xlsx_for_keyword - blm_ingest_cora, blm_generate_batch, run_cora_backlinks (pipelines) - PipelineResult, IngestResult, GenerateResult (return types) 89 tests, 96% coverage.main
commit
381d51e001
|
|
@ -0,0 +1,14 @@
|
|||
__pycache__/
|
||||
*.py[cod]
|
||||
*.egg-info/
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
.coverage.*
|
||||
htmlcov/
|
||||
coverage.xml
|
||||
.ruff_cache/
|
||||
.venv/
|
||||
dist/
|
||||
build/
|
||||
*.bak
|
||||
uv.lock
|
||||
|
|
@ -0,0 +1,211 @@
|
|||
# Linkman-Paperclip-Wrap
|
||||
|
||||
A standalone Python package wrapping the Big-Link-Man (BLM) CLI for use by
|
||||
Paperclip agents. Extracted from CheddahBot (`cheddahbot/tools/linkbuilding.py`)
|
||||
and simplified for consumption by external callers.
|
||||
|
||||
## What it does
|
||||
|
||||
Given a task keyword, the package can:
|
||||
|
||||
1. **Find a matching CORA `.xlsx`** in an inbox folder (e.g. `Cora-For-Humans/`)
|
||||
using fuzzy keyword matching with singular/plural awareness.
|
||||
2. **Invoke Big-Link-Man** to run `ingest-cora` and `generate-batch` on that
|
||||
xlsx, producing the backlink content.
|
||||
3. **Return a structured result** the caller can use to update task state.
|
||||
|
||||
No folder watching, no task-system coupling, no notifications. The caller owns
|
||||
task state and polling cadence; this package is pure work.
|
||||
|
||||
## Package layout
|
||||
|
||||
```
|
||||
src/link_building_workflow/
|
||||
deps.py -- Deps, BLMConfig, LLMCheck types
|
||||
matching.py -- Keyword normalization and fuzzy matching
|
||||
inbox.py -- Inbox folder scanning (list / find-by-keyword)
|
||||
blm.py -- BLM subprocess wrapper and stdout parsers
|
||||
pipeline.py -- run_cora_backlinks, blm_ingest_cora, blm_generate_batch
|
||||
__init__.py -- Public API re-exports
|
||||
```
|
||||
|
||||
## Installation
|
||||
|
||||
```
|
||||
uv add git+https://git.peninsulaindustries.com/bryanb/Linkman-Paperclip-Wrap.git
|
||||
```
|
||||
|
||||
Big-Link-Man itself is a separate dependency the caller provides. Install it
|
||||
on the same host and point `BLMConfig.blm_dir` at the checkout.
|
||||
|
||||
## Public API
|
||||
|
||||
All imports available from the top level:
|
||||
|
||||
```python
|
||||
from link_building_workflow import (
|
||||
# Dependency types
|
||||
Deps, BLMConfig, LLMCheck,
|
||||
# Matching primitives
|
||||
normalize_for_match, fuzzy_keyword_match, filename_stem_to_keyword,
|
||||
# Inbox scanning
|
||||
InboxMatch, list_inbox_xlsx, find_xlsx_for_keyword, find_all_xlsx_for_keyword,
|
||||
# Pipeline entry points
|
||||
PipelineResult, run_cora_backlinks, blm_ingest_cora, blm_generate_batch,
|
||||
# Low-level BLM (if you need to run a custom BLM command)
|
||||
IngestResult, GenerateResult, build_ingest_args,
|
||||
parse_ingest_output, parse_generate_output, run_blm_command,
|
||||
)
|
||||
```
|
||||
|
||||
## Typical usage (Paperclip)
|
||||
|
||||
The caller decides when a task is eligible to run (all required task fields
|
||||
filled in, xlsx present in the inbox). This package provides the primitives
|
||||
to check the xlsx gate and to execute the work.
|
||||
|
||||
```python
|
||||
from link_building_workflow import (
|
||||
Deps, BLMConfig, find_xlsx_for_keyword, run_cora_backlinks,
|
||||
)
|
||||
|
||||
deps = Deps(
|
||||
blm=BLMConfig(
|
||||
blm_dir="/opt/big-link-man",
|
||||
username="your-blm-user",
|
||||
password="your-blm-pass",
|
||||
timeout_seconds=1800,
|
||||
),
|
||||
llm_check=your_plural_checker, # callable[[str, str], bool]
|
||||
)
|
||||
|
||||
def try_run_link_building(task):
|
||||
# Caller gates 1-4: task-field checks (LB Method, Keyword, IMSURL, ...)
|
||||
if not (task.keyword and task.imsurl):
|
||||
return "blocked: missing task fields"
|
||||
|
||||
# Gate 5: does a matching xlsx exist yet?
|
||||
match = find_xlsx_for_keyword(
|
||||
"/data/Cora-For-Humans",
|
||||
task.keyword,
|
||||
deps.llm_check,
|
||||
)
|
||||
if match is None:
|
||||
return "blocked: no xlsx in Cora-For-Humans"
|
||||
|
||||
# Execute
|
||||
result = run_cora_backlinks(
|
||||
xlsx_path=str(match.path),
|
||||
project_name=task.keyword,
|
||||
money_site_url=task.imsurl,
|
||||
custom_anchors=task.custom_anchors or "",
|
||||
cli_flags=task.cli_flags or "",
|
||||
branded_plus_ratio=task.branded_plus_ratio, # None -> BLMConfig default
|
||||
deps=deps,
|
||||
)
|
||||
|
||||
if result.ok:
|
||||
# result.summary is a multi-line human-readable string
|
||||
# result.ingest.project_id, result.generate.job_moved_to, etc.
|
||||
return f"done: {result.summary}"
|
||||
else:
|
||||
# result.step tells you where it stopped: "ingest" or "generate"
|
||||
# result.error has the details
|
||||
return f"failed at {result.step}: {result.error}"
|
||||
```
|
||||
|
||||
## The `LLMCheck` callable
|
||||
|
||||
Used when the fast-path string equality fails during fuzzy matching. Should
|
||||
return `True` iff two keywords are the same modulo plural form ("shaft" vs
|
||||
"shafts", "company" vs "companies"). Return `False` for any other kind of
|
||||
difference. Implementations should cache -- the workflow may call this
|
||||
repeatedly with the same pair while scanning an inbox.
|
||||
|
||||
Example implementation (the one CheddahBot uses):
|
||||
|
||||
```python
|
||||
import httpx
|
||||
|
||||
_cache = {}
|
||||
|
||||
def openrouter_plural_check(a: str, b: str) -> bool:
|
||||
key = (a, b) if a <= b else (b, a)
|
||||
if key in _cache:
|
||||
return _cache[key]
|
||||
resp = httpx.post(
|
||||
"https://openrouter.ai/api/v1/chat/completions",
|
||||
headers={"Authorization": f"Bearer {OPENROUTER_API_KEY}"},
|
||||
json={
|
||||
"model": "anthropic/claude-haiku-4.5",
|
||||
"max_tokens": 5,
|
||||
"messages": [
|
||||
{"role": "system", "content":
|
||||
"Reply with only 'YES' or 'NO'. YES iff the two keywords "
|
||||
"are identical except for singular/plural form."},
|
||||
{"role": "user", "content": f'A: "{a}"\nB: "{b}"'},
|
||||
],
|
||||
},
|
||||
timeout=15,
|
||||
)
|
||||
result = "YES" in resp.json()["choices"][0]["message"]["content"].upper()
|
||||
_cache[key] = result
|
||||
return result
|
||||
```
|
||||
|
||||
Tests may pass `lambda a, b: False` for the fast-path-only case, or any
|
||||
deterministic fake.
|
||||
|
||||
## The `PipelineResult` dataclass
|
||||
|
||||
Every pipeline entry point returns the same shape:
|
||||
|
||||
| field | meaning |
|
||||
|-----------------|----------------------------------------------------------------|
|
||||
| `ok` | True if the pipeline completed the phase it was asked to do |
|
||||
| `step` | "ingest" / "generate" / "complete" (on success) or where it failed |
|
||||
| `ingest` | `IngestResult` if ingest ran, else None |
|
||||
| `generate` | `GenerateResult` if generate ran, else None |
|
||||
| `error` | Human-readable error message (empty on success) |
|
||||
| `summary` | Multi-line human-readable summary, safe to post as a comment |
|
||||
| `project_name` | The BLM project name |
|
||||
| `job_file` | Path to the final job file (post-move on success) |
|
||||
| `log_lines` | Progress messages captured during the run |
|
||||
|
||||
## What this package does NOT do
|
||||
|
||||
- Does not watch folders. No threads, no polling loops.
|
||||
- Does not know about ClickUp, Linear, or any task system. The caller owns
|
||||
task state and decides what status transitions mean.
|
||||
- Does not sync with shared-folder job queues (the old AutoCora queue).
|
||||
- Does not manage the Cora tool itself. It only consumes xlsx files that
|
||||
Cora has already produced.
|
||||
- Does not pick up where BLM leaves off. When BLM finishes `generate-batch`,
|
||||
the job is done from this package's perspective.
|
||||
|
||||
These were deliberate drops during extraction. CheddahBot had folder-watch
|
||||
threads, ClickUp auto-matching, AutoCora queue submission, and a multi-inbox
|
||||
distribution loop. Paperclip owns that scheduling logic in its own code.
|
||||
|
||||
## Development
|
||||
|
||||
Requires Python 3.11+ and [uv](https://docs.astral.sh/uv/).
|
||||
|
||||
```
|
||||
uv sync # install dev + test deps
|
||||
uv run pytest # run the test suite (89 tests, ~96% coverage)
|
||||
uv run ruff check . # lint
|
||||
```
|
||||
|
||||
## Provenance
|
||||
|
||||
Extracted from the CheddahBot repo, specifically:
|
||||
|
||||
- `cheddahbot/tools/linkbuilding.py` -- pipeline logic and fuzzy matching
|
||||
- `cheddahbot/tools/autocora.py` -- only the fuzzy-match helpers were kept;
|
||||
the shared-folder job queue and result polling were dropped
|
||||
- `cheddahbot/scheduler.py` -- folder-watch loops were dropped; their
|
||||
matching logic was converted to a synchronous `find_xlsx_for_keyword` call
|
||||
|
||||
The BLM invocation parameters, stdout parsing regexes, and default ratios
|
||||
match CheddahBot's production behavior exactly.
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
[project]
|
||||
name = "link-building-workflow"
|
||||
version = "0.1.0"
|
||||
description = "CORA xlsx -> Big-Link-Man link building pipeline, extracted from CheddahBot for Paperclip"
|
||||
requires-python = ">=3.11"
|
||||
dependencies = []
|
||||
|
||||
[build-system]
|
||||
requires = ["uv_build>=0.9,<1"]
|
||||
build-backend = "uv_build"
|
||||
|
||||
[tool.uv.build-backend]
|
||||
module-root = "src"
|
||||
|
||||
[dependency-groups]
|
||||
dev = [{ include-group = "lint" }, { include-group = "test" }]
|
||||
lint = ["ruff"]
|
||||
test = ["pytest", "pytest-cov"]
|
||||
|
||||
[tool.uv]
|
||||
default-groups = ["dev", "test"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
addopts = [
|
||||
"-ra",
|
||||
"--strict-markers",
|
||||
"--strict-config",
|
||||
"--cov=link_building_workflow",
|
||||
"--cov-report=term-missing",
|
||||
]
|
||||
|
||||
[tool.coverage.run]
|
||||
branch = true
|
||||
source = ["link_building_workflow"]
|
||||
|
||||
[tool.coverage.report]
|
||||
exclude_lines = [
|
||||
"pragma: no cover",
|
||||
"if TYPE_CHECKING:",
|
||||
"if __name__ == .__main__.:",
|
||||
]
|
||||
show_missing = true
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
target-version = "py311"
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "I", "UP", "B", "SIM", "RUF"]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"tests/**/*.py" = ["S101", "PLR2004", "ANN"]
|
||||
|
|
@ -0,0 +1,128 @@
|
|||
"""Link building workflow: CORA xlsx -> Big-Link-Man pipeline.
|
||||
|
||||
A standalone package extracted from CheddahBot for consumption by Paperclip
|
||||
(and anything else that wants to trigger link-building work). The caller
|
||||
owns task state; this package owns the work.
|
||||
|
||||
Public API
|
||||
----------
|
||||
Matching primitives::
|
||||
|
||||
normalize_for_match(text) -> str
|
||||
fuzzy_keyword_match(a, b, llm_check=None) -> bool
|
||||
filename_stem_to_keyword(stem) -> str
|
||||
|
||||
Inbox scanning::
|
||||
|
||||
list_inbox_xlsx(folder) -> list[Path]
|
||||
find_xlsx_for_keyword(folder, keyword, llm_check=None) -> InboxMatch | None
|
||||
find_all_xlsx_for_keyword(folder, keyword, llm_check=None) -> list[InboxMatch]
|
||||
|
||||
Pipeline entry points::
|
||||
|
||||
run_cora_backlinks(xlsx_path, project_name, money_site_url, deps, ...) -> PipelineResult
|
||||
blm_ingest_cora(xlsx_path, project_name, deps, ...) -> PipelineResult
|
||||
blm_generate_batch(job_file, deps, ...) -> PipelineResult
|
||||
|
||||
Dependency types::
|
||||
|
||||
Deps(blm, llm_check)
|
||||
BLMConfig(blm_dir, username, password, timeout_seconds, ...)
|
||||
LLMCheck = Callable[[str, str], bool]
|
||||
|
||||
Typical Paperclip usage
|
||||
-----------------------
|
||||
|
||||
from link_building_workflow import (
|
||||
Deps, BLMConfig,
|
||||
find_xlsx_for_keyword,
|
||||
run_cora_backlinks,
|
||||
)
|
||||
|
||||
deps = Deps(
|
||||
blm=BLMConfig(blm_dir="/opt/blm", username="...", password="..."),
|
||||
llm_check=my_plural_checker,
|
||||
)
|
||||
|
||||
# Gate: does a matching xlsx exist yet?
|
||||
match = find_xlsx_for_keyword(
|
||||
"/data/Cora-For-Humans", task.keyword, deps.llm_check
|
||||
)
|
||||
if match is None:
|
||||
return "waiting for xlsx"
|
||||
|
||||
# All gates passed -- run the work
|
||||
result = run_cora_backlinks(
|
||||
xlsx_path=str(match.path),
|
||||
project_name=task.keyword,
|
||||
money_site_url=task.imsurl,
|
||||
deps=deps,
|
||||
)
|
||||
if result.ok:
|
||||
# success -- post result.summary as a comment, advance task state
|
||||
...
|
||||
else:
|
||||
# failure -- result.error has the reason, result.step is where it stopped
|
||||
...
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from .blm import (
|
||||
GenerateResult,
|
||||
IngestResult,
|
||||
build_ingest_args,
|
||||
parse_generate_output,
|
||||
parse_ingest_output,
|
||||
run_blm_command,
|
||||
)
|
||||
from .deps import BLMConfig, Deps, LLMCheck
|
||||
from .inbox import (
|
||||
InboxMatch,
|
||||
find_all_xlsx_for_keyword,
|
||||
find_xlsx_for_keyword,
|
||||
list_inbox_xlsx,
|
||||
)
|
||||
from .matching import (
|
||||
filename_stem_to_keyword,
|
||||
fuzzy_keyword_match,
|
||||
normalize_for_match,
|
||||
)
|
||||
from .pipeline import (
|
||||
PipelineResult,
|
||||
ProgressFn,
|
||||
blm_generate_batch,
|
||||
blm_ingest_cora,
|
||||
run_cora_backlinks,
|
||||
)
|
||||
|
||||
__all__ = [ # noqa: RUF022 -- grouped by module for readability
|
||||
# deps
|
||||
"Deps",
|
||||
"BLMConfig",
|
||||
"LLMCheck",
|
||||
# matching
|
||||
"normalize_for_match",
|
||||
"fuzzy_keyword_match",
|
||||
"filename_stem_to_keyword",
|
||||
# inbox
|
||||
"InboxMatch",
|
||||
"list_inbox_xlsx",
|
||||
"find_xlsx_for_keyword",
|
||||
"find_all_xlsx_for_keyword",
|
||||
# blm low-level
|
||||
"IngestResult",
|
||||
"GenerateResult",
|
||||
"build_ingest_args",
|
||||
"parse_ingest_output",
|
||||
"parse_generate_output",
|
||||
"run_blm_command",
|
||||
# pipeline
|
||||
"PipelineResult",
|
||||
"ProgressFn",
|
||||
"run_cora_backlinks",
|
||||
"blm_ingest_cora",
|
||||
"blm_generate_batch",
|
||||
]
|
||||
|
||||
__version__ = "0.1.0"
|
||||
|
|
@ -0,0 +1,146 @@
|
|||
"""Big-Link-Man CLI wrapper.
|
||||
|
||||
BLM is an external Python tool. We invoke it via subprocess using whatever
|
||||
Python interpreter the caller configured in `BLMConfig.python_exe`. With
|
||||
BLM installed alongside the caller on the same host (the expected Paperclip
|
||||
setup), this is usually just "python".
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .deps import BLMConfig
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IngestResult:
|
||||
"""Parsed output of an `ingest-cora` run."""
|
||||
|
||||
project_id: str = ""
|
||||
job_file: str = ""
|
||||
project_name: str = ""
|
||||
main_keyword: str = ""
|
||||
|
||||
@property
|
||||
def success(self) -> bool:
|
||||
return bool(self.project_id and self.job_file)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GenerateResult:
|
||||
"""Parsed output of a `generate-batch` run."""
|
||||
|
||||
job_moved_to: str = ""
|
||||
success: bool = False
|
||||
raw_output: str = ""
|
||||
|
||||
|
||||
def build_ingest_args(
|
||||
*,
|
||||
xlsx_path: str,
|
||||
project_name: str,
|
||||
money_site_url: str = "",
|
||||
branded_plus_ratio: float = 0.7,
|
||||
custom_anchors: str = "",
|
||||
cli_flags: str = "",
|
||||
) -> list[str]:
|
||||
"""Construct the argv tail for `main.py ingest-cora ...`.
|
||||
|
||||
Does not include the interpreter or `main.py`. Does not include -u/-p
|
||||
credentials -- those are injected by `run_blm_command`. Only emits
|
||||
-bp when the ratio differs from BLM's own default of 0.7 to keep the
|
||||
command line minimal when defaults are in play.
|
||||
"""
|
||||
args = ["ingest-cora", "-f", xlsx_path, "-n", project_name]
|
||||
if money_site_url:
|
||||
args.extend(["-m", money_site_url])
|
||||
if branded_plus_ratio and branded_plus_ratio != 0.7:
|
||||
args.extend(["-bp", str(branded_plus_ratio)])
|
||||
if custom_anchors:
|
||||
args.extend(["-a", custom_anchors])
|
||||
if cli_flags:
|
||||
args.extend(cli_flags.strip().split())
|
||||
return args
|
||||
|
||||
|
||||
def run_blm_command(
|
||||
args: list[str],
|
||||
blm: BLMConfig,
|
||||
) -> subprocess.CompletedProcess:
|
||||
"""Run a BLM CLI command and return the CompletedProcess.
|
||||
|
||||
Always injects -u/-p from `blm.username`/`blm.password` unless the
|
||||
caller already put them in `args`. cwd is set to `blm.blm_dir` so BLM
|
||||
can find its own relative paths (config files, jobs/ directory, etc.).
|
||||
|
||||
Raises subprocess.TimeoutExpired if the command exceeds
|
||||
`blm.timeout_seconds`. Raises FileNotFoundError if `blm.blm_dir` or
|
||||
`blm.python_exe` can't be found.
|
||||
"""
|
||||
blm_path = Path(blm.blm_dir)
|
||||
if not blm_path.exists():
|
||||
raise FileNotFoundError(f"BLM directory not found: {blm.blm_dir}")
|
||||
|
||||
cmd: list[str] = [blm.python_exe, "main.py", *args]
|
||||
|
||||
if blm.username and "-u" not in args and "--username" not in args:
|
||||
cmd.extend(["-u", blm.username])
|
||||
if blm.password and "-p" not in args and "--password" not in args:
|
||||
cmd.extend(["-p", blm.password])
|
||||
|
||||
log.info("Running BLM: %s (cwd=%s)", " ".join(cmd), blm.blm_dir)
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=blm.blm_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=blm.timeout_seconds,
|
||||
)
|
||||
log.info("BLM exit=%d", result.returncode)
|
||||
if result.stdout:
|
||||
log.debug("BLM stdout: %s", result.stdout[:1000])
|
||||
if result.stderr:
|
||||
log.debug("BLM stderr: %s", result.stderr[:1000])
|
||||
return result
|
||||
|
||||
|
||||
_INGEST_PROJECT_RE = re.compile(r"^Success: Project '(.+)' created \(ID: (\d+)\)$")
|
||||
_INGEST_JOB_RE = re.compile(r"^Job file created: (.+)$")
|
||||
_INGEST_KEYWORD_RE = re.compile(r"^Main Keyword: (.+)$")
|
||||
_GENERATE_MOVED_RE = re.compile(r"^Job file moved to: (.+)$")
|
||||
|
||||
|
||||
def parse_ingest_output(stdout: str) -> IngestResult:
|
||||
"""Extract project id, job file path, and main keyword from ingest stdout."""
|
||||
result = IngestResult()
|
||||
for raw in stdout.splitlines():
|
||||
line = raw.strip()
|
||||
if m := _INGEST_PROJECT_RE.match(line):
|
||||
result.project_name = m.group(1)
|
||||
result.project_id = m.group(2)
|
||||
continue
|
||||
if m := _INGEST_JOB_RE.match(line):
|
||||
result.job_file = m.group(1).strip()
|
||||
continue
|
||||
if m := _INGEST_KEYWORD_RE.match(line):
|
||||
result.main_keyword = m.group(1).strip()
|
||||
continue
|
||||
return result
|
||||
|
||||
|
||||
def parse_generate_output(stdout: str) -> GenerateResult:
|
||||
"""Extract the post-run job file path from generate-batch stdout."""
|
||||
result = GenerateResult(raw_output=stdout)
|
||||
for raw in stdout.splitlines():
|
||||
line = raw.strip()
|
||||
if m := _GENERATE_MOVED_RE.match(line):
|
||||
result.job_moved_to = m.group(1).strip()
|
||||
result.success = True
|
||||
return result
|
||||
|
|
@ -0,0 +1,63 @@
|
|||
"""Dependency injection types for the link building workflow.
|
||||
|
||||
The workflow is LLM-agnostic: the caller (Paperclip, tests, anything) implements
|
||||
the `LLMCheck` callable and passes a `Deps` instance into every public function.
|
||||
|
||||
Nothing in this module touches the network, a task system, or an LLM directly.
|
||||
Task state (status, custom fields, comments) is owned by the caller -- this
|
||||
package only returns structured results and lets the caller decide what to do.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
|
||||
# An LLM-backed equality check for singular/plural keyword matching.
|
||||
# Returns True iff `a` and `b` are the same SEO keyword modulo plural form
|
||||
# (e.g. "shaft" vs "shafts", "company" vs "companies"). Returns False for
|
||||
# any other kind of difference.
|
||||
#
|
||||
# Implementations should cache results; the workflow may call this repeatedly
|
||||
# with the same pair while scanning a folder of xlsx files against a task list.
|
||||
LLMCheck = Callable[[str, str], bool]
|
||||
|
||||
|
||||
@dataclass
|
||||
class BLMConfig:
|
||||
"""Configuration for invoking the Big-Link-Man CLI.
|
||||
|
||||
BLM is an external Python tool; the workflow shells out to it. With BLM
|
||||
installed on the same host as the caller (the expected Paperclip setup),
|
||||
the default of using the system `python` resolves BLM's own dependencies
|
||||
if BLM was installed into the same environment. Override `python_exe` to
|
||||
point at a specific interpreter when BLM has its own virtualenv.
|
||||
"""
|
||||
|
||||
# Path to the Big-Link-Man checkout. Must contain main.py.
|
||||
blm_dir: str
|
||||
# BLM auth, passed as -u / -p on every CLI call. Empty strings are
|
||||
# skipped, so it's safe to leave these unset if BLM doesn't need them.
|
||||
username: str = ""
|
||||
password: str = ""
|
||||
# Subprocess timeout per BLM invocation, in seconds.
|
||||
# Default covers generate-batch runs of ~25 min plus headroom.
|
||||
timeout_seconds: int = 1800
|
||||
# Default branded+ ratio passed to ingest-cora if the caller doesn't
|
||||
# supply one. BLM's own default is 0.7; we match it.
|
||||
default_branded_plus_ratio: float = 0.7
|
||||
# Python interpreter used to run BLM. Defaults to "python" (on PATH).
|
||||
# Set to an absolute path like "/opt/blm/.venv/bin/python" if BLM has
|
||||
# its own venv separate from the caller.
|
||||
python_exe: str = "python"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Deps:
|
||||
"""Container for everything the workflow needs from the outside world.
|
||||
|
||||
Construct this once per run and pass it through.
|
||||
"""
|
||||
|
||||
blm: BLMConfig
|
||||
llm_check: LLMCheck
|
||||
|
|
@ -0,0 +1,116 @@
|
|||
"""Locate CORA .xlsx files in an inbox folder by keyword.
|
||||
|
||||
The caller's task state tells them which keyword they're looking for; this
|
||||
module tells them whether a matching xlsx exists and where. No folder
|
||||
watching, no threading -- it's a one-shot scan that Paperclip calls when
|
||||
evaluating whether a task can run.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .deps import LLMCheck
|
||||
from .matching import filename_stem_to_keyword, fuzzy_keyword_match, normalize_for_match
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class InboxMatch:
|
||||
"""A single xlsx in the inbox that matched the requested keyword."""
|
||||
|
||||
path: Path
|
||||
filename: str
|
||||
stem_keyword: str # the normalized keyword derived from the filename
|
||||
|
||||
|
||||
def list_inbox_xlsx(
|
||||
folder: str | Path,
|
||||
*,
|
||||
skip_processed: bool = True,
|
||||
) -> list[Path]:
|
||||
"""Return all .xlsx files in `folder` that are candidates for matching.
|
||||
|
||||
Skips Office temp/lock files (names starting with "~$"). If
|
||||
`skip_processed` is True (the default), also skips any file whose name
|
||||
appears in `folder/processed/` -- that subfolder is treated as the
|
||||
archive of already-handled files.
|
||||
|
||||
Returns an empty list if the folder doesn't exist. The caller decides
|
||||
whether that's an error or a "no xlsx available yet" gate.
|
||||
"""
|
||||
path = Path(folder)
|
||||
if not path.exists():
|
||||
log.warning("Inbox folder does not exist: %s", path)
|
||||
return []
|
||||
|
||||
processed_names: set[str] = set()
|
||||
if skip_processed:
|
||||
processed_dir = path / "processed"
|
||||
if processed_dir.exists():
|
||||
processed_names = {f.name for f in processed_dir.glob("*.xlsx")}
|
||||
|
||||
candidates: list[Path] = []
|
||||
for f in sorted(path.glob("*.xlsx")):
|
||||
if f.name.startswith("~$"):
|
||||
continue
|
||||
if f.name in processed_names:
|
||||
continue
|
||||
candidates.append(f)
|
||||
return candidates
|
||||
|
||||
|
||||
def find_xlsx_for_keyword(
|
||||
folder: str | Path,
|
||||
keyword: str,
|
||||
llm_check: LLMCheck | None = None,
|
||||
*,
|
||||
skip_processed: bool = True,
|
||||
) -> InboxMatch | None:
|
||||
"""Find a single xlsx in `folder` whose filename matches `keyword`.
|
||||
|
||||
Returns the first match on a filename-stem fuzzy-match against the
|
||||
requested keyword, or None if nothing matches.
|
||||
|
||||
`keyword` is normalized internally, so the caller can pass it in any
|
||||
form (e.g. "Precision CNC Machining" or "precision-cnc-machining").
|
||||
|
||||
Uses `llm_check` for singular/plural equivalence; falls back to exact
|
||||
match if `llm_check` is None. `skip_processed` controls whether files
|
||||
already in `folder/processed/` are considered.
|
||||
"""
|
||||
matches = find_all_xlsx_for_keyword(
|
||||
folder, keyword, llm_check, skip_processed=skip_processed
|
||||
)
|
||||
return matches[0] if matches else None
|
||||
|
||||
|
||||
def find_all_xlsx_for_keyword(
|
||||
folder: str | Path,
|
||||
keyword: str,
|
||||
llm_check: LLMCheck | None = None,
|
||||
*,
|
||||
skip_processed: bool = True,
|
||||
) -> list[InboxMatch]:
|
||||
"""Find every xlsx in `folder` whose filename matches `keyword`.
|
||||
|
||||
Same matching rules as `find_xlsx_for_keyword`, but returns all matches
|
||||
instead of just the first. Useful when a keyword legitimately has
|
||||
several xlsx variants (for example an original and a re-run) and the
|
||||
caller wants to pick the newest by mtime.
|
||||
"""
|
||||
target = normalize_for_match(keyword)
|
||||
if not target:
|
||||
return []
|
||||
|
||||
results: list[InboxMatch] = []
|
||||
for f in list_inbox_xlsx(folder, skip_processed=skip_processed):
|
||||
stem_kw = filename_stem_to_keyword(f.stem)
|
||||
if fuzzy_keyword_match(target, stem_kw, llm_check):
|
||||
results.append(
|
||||
InboxMatch(path=f, filename=f.name, stem_keyword=stem_kw)
|
||||
)
|
||||
return results
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
"""Keyword normalization and fuzzy matching.
|
||||
|
||||
Pure functions with no I/O. The LLM check for singular/plural equivalence
|
||||
is injected, so tests can substitute a deterministic fake and production
|
||||
can plug in any model.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from .deps import LLMCheck
|
||||
|
||||
|
||||
def normalize_for_match(text: str) -> str:
|
||||
"""Normalize text for fuzzy matching.
|
||||
|
||||
Lowercases, replaces runs of non-alphanumeric characters with a single
|
||||
space, and collapses whitespace. The result is suitable as input to
|
||||
`fuzzy_keyword_match`.
|
||||
"""
|
||||
text = text.lower().strip()
|
||||
text = re.sub(r"[^a-z0-9\s]", " ", text)
|
||||
text = re.sub(r"\s+", " ", text).strip()
|
||||
return text
|
||||
|
||||
|
||||
def fuzzy_keyword_match(a: str, b: str, llm_check: LLMCheck | None = None) -> bool:
|
||||
"""Compare two normalized strings for keyword equivalence.
|
||||
|
||||
Fast path: exact match after normalization returns True immediately.
|
||||
Slow path: delegate to `llm_check` to decide if the two keywords differ
|
||||
only in singular vs plural form.
|
||||
|
||||
If `llm_check` is None and the fast path fails, returns False. Empty
|
||||
inputs always return False.
|
||||
|
||||
Inputs are expected to be pre-normalized (call `normalize_for_match`
|
||||
first). Passing un-normalized strings will still work but is wasteful
|
||||
when matching against many candidates.
|
||||
"""
|
||||
if not a or not b:
|
||||
return False
|
||||
if a == b:
|
||||
return True
|
||||
if llm_check is None:
|
||||
return False
|
||||
return llm_check(a, b)
|
||||
|
||||
|
||||
def filename_stem_to_keyword(stem: str) -> str:
|
||||
"""Convert a filename stem to a matchable keyword.
|
||||
|
||||
Example: "precision-cnc_machining" -> "precision cnc machining"
|
||||
|
||||
The returned value is already normalized.
|
||||
"""
|
||||
stem = stem.lower().replace("-", " ").replace("_", " ")
|
||||
return normalize_for_match(stem)
|
||||
|
|
@ -0,0 +1,318 @@
|
|||
"""High-level link building pipelines.
|
||||
|
||||
These functions do the work and return a structured result. They do NOT
|
||||
touch any task system. The caller (Paperclip) reads the returned
|
||||
`PipelineResult` and decides what to do: update task status, post a
|
||||
comment, move the xlsx to a processed folder, alert, etc.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
from . import blm as blm_mod
|
||||
from .blm import (
|
||||
GenerateResult,
|
||||
IngestResult,
|
||||
build_ingest_args,
|
||||
parse_generate_output,
|
||||
parse_ingest_output,
|
||||
)
|
||||
from .deps import Deps
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
ProgressFn = Callable[[str], None]
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineResult:
|
||||
"""Outcome of a full or partial link building pipeline run.
|
||||
|
||||
`ok` is the single boolean the caller should branch on. `step` tells
|
||||
them which phase the result is from: on failure it's where the
|
||||
pipeline stopped; on success it's "ingest", "generate", or "complete"
|
||||
depending on which entry point was called.
|
||||
|
||||
`summary` is a multi-line human-readable string safe to post as a
|
||||
task comment or log. `error` is populated only when `ok` is False.
|
||||
"""
|
||||
|
||||
ok: bool
|
||||
step: str # "ingest" | "generate" | "complete"
|
||||
ingest: IngestResult | None = None
|
||||
generate: GenerateResult | None = None
|
||||
error: str = ""
|
||||
summary: str = ""
|
||||
project_name: str = ""
|
||||
job_file: str = ""
|
||||
log_lines: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
def _err(
|
||||
step: str,
|
||||
message: str,
|
||||
ingest: IngestResult | None = None,
|
||||
generate: GenerateResult | None = None,
|
||||
log_lines: list[str] | None = None,
|
||||
) -> PipelineResult:
|
||||
return PipelineResult(
|
||||
ok=False,
|
||||
step=step,
|
||||
ingest=ingest,
|
||||
generate=generate,
|
||||
error=message,
|
||||
summary=f"Error during {step}: {message}",
|
||||
log_lines=log_lines or [],
|
||||
)
|
||||
|
||||
|
||||
def blm_ingest_cora(
|
||||
*,
|
||||
xlsx_path: str,
|
||||
project_name: str,
|
||||
deps: Deps,
|
||||
money_site_url: str = "",
|
||||
branded_plus_ratio: float | None = None,
|
||||
custom_anchors: str = "",
|
||||
cli_flags: str = "",
|
||||
) -> PipelineResult:
|
||||
"""Run only BLM's `ingest-cora` command.
|
||||
|
||||
Use when you want to parse a CORA .xlsx into a BLM project + job file
|
||||
without immediately generating content. The returned result carries
|
||||
the project id and the job file path; pass that path to
|
||||
`blm_generate_batch` later to produce content.
|
||||
"""
|
||||
if not xlsx_path:
|
||||
return _err("ingest", "xlsx_path is required")
|
||||
if not project_name:
|
||||
return _err("ingest", "project_name is required")
|
||||
if not Path(xlsx_path).exists():
|
||||
return _err("ingest", f"CORA file not found: {xlsx_path}")
|
||||
|
||||
bp_ratio = (
|
||||
branded_plus_ratio
|
||||
if branded_plus_ratio is not None
|
||||
else deps.blm.default_branded_plus_ratio
|
||||
)
|
||||
|
||||
args = build_ingest_args(
|
||||
xlsx_path=xlsx_path,
|
||||
project_name=project_name,
|
||||
money_site_url=money_site_url,
|
||||
branded_plus_ratio=bp_ratio,
|
||||
custom_anchors=custom_anchors,
|
||||
cli_flags=cli_flags,
|
||||
)
|
||||
|
||||
try:
|
||||
proc = blm_mod.run_blm_command(args, deps.blm)
|
||||
except subprocess.TimeoutExpired:
|
||||
return _err(
|
||||
"ingest", f"ingest-cora timed out after {deps.blm.timeout_seconds // 60}m"
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
return _err("ingest", str(e))
|
||||
|
||||
ingest = parse_ingest_output(proc.stdout)
|
||||
if proc.returncode != 0 or not ingest.success:
|
||||
return _err(
|
||||
"ingest",
|
||||
f"ingest-cora failed (exit={proc.returncode}). "
|
||||
f"stdout tail: {proc.stdout[-500:]}\nstderr tail: {proc.stderr[-500:]}",
|
||||
ingest=ingest,
|
||||
)
|
||||
|
||||
summary = (
|
||||
f"CORA ingest complete.\n"
|
||||
f"- Project: {ingest.project_name} (ID: {ingest.project_id})\n"
|
||||
f"- Keyword: {ingest.main_keyword}\n"
|
||||
f"- Job file: {ingest.job_file}"
|
||||
)
|
||||
return PipelineResult(
|
||||
ok=True,
|
||||
step="ingest",
|
||||
ingest=ingest,
|
||||
summary=summary,
|
||||
project_name=ingest.project_name,
|
||||
job_file=ingest.job_file,
|
||||
)
|
||||
|
||||
|
||||
def blm_generate_batch(
|
||||
*,
|
||||
job_file: str,
|
||||
deps: Deps,
|
||||
continue_on_error: bool = True,
|
||||
debug: bool = False,
|
||||
) -> PipelineResult:
|
||||
"""Run only BLM's `generate-batch` command on an existing job file.
|
||||
|
||||
`job_file` may be absolute or relative; relative paths are resolved
|
||||
against `deps.blm.blm_dir`.
|
||||
"""
|
||||
if not job_file:
|
||||
return _err("generate", "job_file is required")
|
||||
|
||||
job_path = (
|
||||
Path(job_file)
|
||||
if Path(job_file).is_absolute()
|
||||
else Path(deps.blm.blm_dir) / job_file
|
||||
)
|
||||
if not job_path.exists():
|
||||
return _err("generate", f"Job file not found: {job_path}")
|
||||
|
||||
args = ["generate-batch", "-j", str(job_path)]
|
||||
if continue_on_error:
|
||||
args.append("--continue-on-error")
|
||||
if debug:
|
||||
args.append("--debug")
|
||||
|
||||
try:
|
||||
proc = blm_mod.run_blm_command(args, deps.blm)
|
||||
except subprocess.TimeoutExpired:
|
||||
return _err(
|
||||
"generate",
|
||||
f"generate-batch timed out after {deps.blm.timeout_seconds // 60}m",
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
return _err("generate", str(e))
|
||||
|
||||
gen = parse_generate_output(proc.stdout)
|
||||
if proc.returncode != 0:
|
||||
return _err(
|
||||
"generate",
|
||||
f"generate-batch failed (exit={proc.returncode}). "
|
||||
f"stdout tail: {proc.stdout[-500:]}\nstderr tail: {proc.stderr[-500:]}",
|
||||
generate=gen,
|
||||
)
|
||||
|
||||
parts = ["Content generation complete."]
|
||||
parts.append(f"- Status: {'Success' if gen.success else 'Completed'}")
|
||||
if gen.job_moved_to:
|
||||
parts.append(f"- Job moved to: {gen.job_moved_to}")
|
||||
|
||||
return PipelineResult(
|
||||
ok=True,
|
||||
step="generate",
|
||||
generate=gen,
|
||||
summary="\n".join(parts),
|
||||
job_file=gen.job_moved_to or job_file,
|
||||
)
|
||||
|
||||
|
||||
def run_cora_backlinks(
|
||||
*,
|
||||
xlsx_path: str,
|
||||
project_name: str,
|
||||
money_site_url: str,
|
||||
deps: Deps,
|
||||
branded_plus_ratio: float | None = None,
|
||||
custom_anchors: str = "",
|
||||
cli_flags: str = "",
|
||||
on_progress: ProgressFn | None = None,
|
||||
) -> PipelineResult:
|
||||
"""Full Cora Backlinks pipeline: ingest-cora -> generate-batch.
|
||||
|
||||
Requires `money_site_url` (IMSURL) -- BLM cannot run interactively in
|
||||
subprocess mode, so the URL must be supplied up front.
|
||||
|
||||
`on_progress` is an optional callback invoked with free-form status
|
||||
strings ("Step 1/2: ..." etc.); pass one if you want live updates
|
||||
streamed to a UI, log, or task comment as the pipeline runs. The
|
||||
same strings are also captured in the returned `log_lines`.
|
||||
|
||||
On failure, the returned `PipelineResult` has `ok=False`, `step` set
|
||||
to where it stopped ("ingest" or "generate"), and `error` populated.
|
||||
On success, `step == "complete"` and both `ingest` and `generate` are
|
||||
populated.
|
||||
"""
|
||||
if not xlsx_path:
|
||||
return _err("ingest", "xlsx_path is required")
|
||||
if not project_name:
|
||||
return _err("ingest", "project_name is required")
|
||||
if not money_site_url:
|
||||
return _err(
|
||||
"ingest",
|
||||
"money_site_url (IMSURL) is required; BLM runs non-interactively",
|
||||
)
|
||||
|
||||
log_lines: list[str] = []
|
||||
|
||||
def _progress(msg: str) -> None:
|
||||
log_lines.append(msg)
|
||||
log.info("[LB Pipeline] %s", msg)
|
||||
if on_progress is not None:
|
||||
try:
|
||||
on_progress(msg)
|
||||
except Exception:
|
||||
log.exception("on_progress callback raised; continuing")
|
||||
|
||||
_progress(f"Step 1/2: Ingesting CORA report for {project_name}...")
|
||||
|
||||
ingest = blm_ingest_cora(
|
||||
xlsx_path=xlsx_path,
|
||||
project_name=project_name,
|
||||
deps=deps,
|
||||
money_site_url=money_site_url,
|
||||
branded_plus_ratio=branded_plus_ratio,
|
||||
custom_anchors=custom_anchors,
|
||||
cli_flags=cli_flags,
|
||||
)
|
||||
if not ingest.ok:
|
||||
ingest.log_lines = log_lines
|
||||
return ingest
|
||||
|
||||
assert ingest.ingest is not None
|
||||
ing = ingest.ingest
|
||||
|
||||
_progress(f"Step 2/2: Generating content batch for {project_name}...")
|
||||
|
||||
gen = blm_generate_batch(
|
||||
job_file=ing.job_file,
|
||||
deps=deps,
|
||||
continue_on_error=True,
|
||||
)
|
||||
if not gen.ok:
|
||||
return PipelineResult(
|
||||
ok=False,
|
||||
step="generate",
|
||||
ingest=ing,
|
||||
generate=gen.generate,
|
||||
error=gen.error,
|
||||
summary=ingest.summary + "\n\n" + gen.summary,
|
||||
project_name=project_name,
|
||||
job_file=ing.job_file,
|
||||
log_lines=log_lines,
|
||||
)
|
||||
|
||||
assert gen.generate is not None
|
||||
g = gen.generate
|
||||
|
||||
summary = (
|
||||
f"## Step 1: Ingest CORA Report\n"
|
||||
f"- Project: {project_name} (ID: {ing.project_id})\n"
|
||||
f"- Keyword: {ing.main_keyword}\n"
|
||||
f"- Job file: {ing.job_file}\n"
|
||||
f"\n"
|
||||
f"## Step 2: Generate Content Batch\n"
|
||||
f"- Status: {'Success' if g.success else 'Completed'}\n"
|
||||
+ (f"- Job moved to: {g.job_moved_to}\n" if g.job_moved_to else "")
|
||||
)
|
||||
|
||||
return PipelineResult(
|
||||
ok=True,
|
||||
step="complete",
|
||||
ingest=ing,
|
||||
generate=g,
|
||||
summary=summary,
|
||||
project_name=project_name,
|
||||
job_file=g.job_moved_to or ing.job_file,
|
||||
log_lines=log_lines,
|
||||
)
|
||||
|
|
@ -0,0 +1,80 @@
|
|||
"""Shared test fixtures."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from link_building_workflow import BLMConfig, Deps, LLMCheck
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def blm_dir(tmp_path: Path) -> Path:
|
||||
"""A fake BLM directory on disk so run_blm_command's existence check passes."""
|
||||
d = tmp_path / "blm"
|
||||
d.mkdir()
|
||||
# Touch main.py so any accidental real subprocess call gets further; we
|
||||
# still mock subprocess.run in tests, but this is a harmless safety net.
|
||||
(d / "main.py").write_text("# fake\n")
|
||||
return d
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def blm_config(blm_dir: Path) -> BLMConfig:
|
||||
return BLMConfig(
|
||||
blm_dir=str(blm_dir),
|
||||
username="testuser",
|
||||
password="testpass",
|
||||
timeout_seconds=300,
|
||||
python_exe="python",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def llm_never() -> LLMCheck:
|
||||
"""LLM check that always returns False (fast-path only matches)."""
|
||||
return lambda a, b: False
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def llm_always() -> LLMCheck:
|
||||
"""LLM check that always returns True (treat everything as plural-equiv)."""
|
||||
return lambda a, b: True
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def deps(blm_config: BLMConfig, llm_never) -> Deps:
|
||||
return Deps(blm=blm_config, llm_check=llm_never)
|
||||
|
||||
|
||||
# Canonical ingest stdout, matches the BLM output format the parser is tuned for
|
||||
@pytest.fixture()
|
||||
def ingest_success_stdout() -> str:
|
||||
return (
|
||||
"Authenticated as: testuser (User)\n"
|
||||
"\n"
|
||||
"Parsing CORA file: /tmp/test.xlsx\n"
|
||||
"Main Keyword: precision cnc machining\n"
|
||||
"Word Count: 1500\n"
|
||||
"\n"
|
||||
"Creating project: Test Project\n"
|
||||
"Money Site URL: https://example.com\n"
|
||||
"\n"
|
||||
"Success: Project 'Test Project' created (ID: 42)\n"
|
||||
"Main Keyword: precision cnc machining\n"
|
||||
"Money Site URL: https://example.com\n"
|
||||
"Job file created: jobs/test-project.json\n"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def generate_success_stdout() -> str:
|
||||
return (
|
||||
"Loading job file: jobs/test-project.json\n"
|
||||
"Generating backlink 1 of 10...\n"
|
||||
"Generating backlink 2 of 10...\n"
|
||||
"...\n"
|
||||
"All backlinks generated.\n"
|
||||
"Job file moved to: jobs/done/test-project.json\n"
|
||||
)
|
||||
|
|
@ -0,0 +1,220 @@
|
|||
"""Tests for the BLM CLI subprocess wrapper and output parsers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from link_building_workflow import BLMConfig
|
||||
from link_building_workflow.blm import (
|
||||
build_ingest_args,
|
||||
parse_generate_output,
|
||||
parse_ingest_output,
|
||||
run_blm_command,
|
||||
)
|
||||
|
||||
|
||||
class TestBuildIngestArgs:
|
||||
def test_required_args_only(self):
|
||||
args = build_ingest_args(xlsx_path="/tmp/f.xlsx", project_name="P")
|
||||
assert args == ["ingest-cora", "-f", "/tmp/f.xlsx", "-n", "P"]
|
||||
|
||||
def test_with_money_site_url(self):
|
||||
args = build_ingest_args(
|
||||
xlsx_path="/tmp/f.xlsx",
|
||||
project_name="P",
|
||||
money_site_url="https://example.com",
|
||||
)
|
||||
assert "-m" in args
|
||||
i = args.index("-m")
|
||||
assert args[i + 1] == "https://example.com"
|
||||
|
||||
def test_branded_plus_ratio_default_omitted(self):
|
||||
args = build_ingest_args(
|
||||
xlsx_path="/tmp/f.xlsx", project_name="P", branded_plus_ratio=0.7
|
||||
)
|
||||
assert "-bp" not in args
|
||||
|
||||
def test_branded_plus_ratio_custom_included(self):
|
||||
args = build_ingest_args(
|
||||
xlsx_path="/tmp/f.xlsx", project_name="P", branded_plus_ratio=0.8
|
||||
)
|
||||
assert "-bp" in args
|
||||
assert args[args.index("-bp") + 1] == "0.8"
|
||||
|
||||
def test_custom_anchors(self):
|
||||
args = build_ingest_args(
|
||||
xlsx_path="/tmp/f.xlsx",
|
||||
project_name="P",
|
||||
custom_anchors="a1,a2",
|
||||
)
|
||||
assert "-a" in args
|
||||
assert args[args.index("-a") + 1] == "a1,a2"
|
||||
|
||||
def test_cli_flags_split_on_whitespace(self):
|
||||
args = build_ingest_args(
|
||||
xlsx_path="/tmp/f.xlsx",
|
||||
project_name="P",
|
||||
cli_flags="--foo --bar baz",
|
||||
)
|
||||
assert "--foo" in args
|
||||
assert "--bar" in args
|
||||
assert "baz" in args
|
||||
|
||||
def test_cli_flags_empty_string_no_extra_args(self):
|
||||
args = build_ingest_args(
|
||||
xlsx_path="/tmp/f.xlsx", project_name="P", cli_flags=""
|
||||
)
|
||||
assert args == ["ingest-cora", "-f", "/tmp/f.xlsx", "-n", "P"]
|
||||
|
||||
|
||||
class TestParseIngestOutput:
|
||||
def test_full_success(self, ingest_success_stdout):
|
||||
result = parse_ingest_output(ingest_success_stdout)
|
||||
assert result.project_id == "42"
|
||||
assert result.project_name == "Test Project"
|
||||
assert result.main_keyword == "precision cnc machining"
|
||||
assert result.job_file == "jobs/test-project.json"
|
||||
assert result.success is True
|
||||
|
||||
def test_missing_project_line(self):
|
||||
stdout = "Job file created: jobs/x.json\n"
|
||||
result = parse_ingest_output(stdout)
|
||||
assert result.project_id == ""
|
||||
assert result.project_name == ""
|
||||
assert result.success is False # no project_id
|
||||
|
||||
def test_missing_job_line(self):
|
||||
stdout = "Success: Project 'X' created (ID: 1)\n"
|
||||
result = parse_ingest_output(stdout)
|
||||
assert result.project_id == "1"
|
||||
assert result.job_file == ""
|
||||
assert result.success is False # no job_file
|
||||
|
||||
def test_empty_stdout(self):
|
||||
result = parse_ingest_output("")
|
||||
assert result.project_id == ""
|
||||
assert result.job_file == ""
|
||||
assert result.success is False
|
||||
|
||||
def test_ignores_noise(self):
|
||||
stdout = (
|
||||
"Some random banner\n"
|
||||
"DEBUG: lots of stuff\n"
|
||||
"Success: Project 'Foo Bar' created (ID: 99)\n"
|
||||
"WARNING: meaningless\n"
|
||||
"Main Keyword: foo bar\n"
|
||||
"Job file created: jobs/foo-bar.json\n"
|
||||
"Done.\n"
|
||||
)
|
||||
result = parse_ingest_output(stdout)
|
||||
assert result.project_id == "99"
|
||||
assert result.project_name == "Foo Bar"
|
||||
assert result.main_keyword == "foo bar"
|
||||
assert result.job_file == "jobs/foo-bar.json"
|
||||
|
||||
def test_whitespace_around_job_file(self):
|
||||
stdout = "Job file created: jobs/x.json \n"
|
||||
result = parse_ingest_output(stdout)
|
||||
assert result.job_file == "jobs/x.json"
|
||||
|
||||
|
||||
class TestParseGenerateOutput:
|
||||
def test_success_with_move(self, generate_success_stdout):
|
||||
result = parse_generate_output(generate_success_stdout)
|
||||
assert result.success is True
|
||||
assert result.job_moved_to == "jobs/done/test-project.json"
|
||||
assert "Job file moved to" in result.raw_output
|
||||
|
||||
def test_no_move_line(self):
|
||||
stdout = "Generating backlinks...\nSome error occurred.\n"
|
||||
result = parse_generate_output(stdout)
|
||||
assert result.success is False
|
||||
assert result.job_moved_to == ""
|
||||
assert result.raw_output == stdout
|
||||
|
||||
def test_empty_stdout(self):
|
||||
result = parse_generate_output("")
|
||||
assert result.success is False
|
||||
assert result.job_moved_to == ""
|
||||
|
||||
|
||||
class TestRunBlmCommand:
|
||||
def test_passes_cwd_and_interpreter(self, blm_config: BLMConfig):
|
||||
mock_result = MagicMock(returncode=0, stdout="", stderr="")
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
run_blm_command(["ingest-cora", "-f", "x.xlsx"], blm_config)
|
||||
call = mock_run.call_args
|
||||
cmd = call[0][0]
|
||||
assert cmd[0] == "python"
|
||||
assert cmd[1] == "main.py"
|
||||
assert "ingest-cora" in cmd
|
||||
assert call[1]["cwd"] == blm_config.blm_dir
|
||||
|
||||
def test_injects_credentials(self, blm_config: BLMConfig):
|
||||
mock_result = MagicMock(returncode=0, stdout="", stderr="")
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
run_blm_command(["ingest-cora"], blm_config)
|
||||
cmd = mock_run.call_args[0][0]
|
||||
assert "-u" in cmd
|
||||
assert cmd[cmd.index("-u") + 1] == "testuser"
|
||||
assert "-p" in cmd
|
||||
assert cmd[cmd.index("-p") + 1] == "testpass"
|
||||
|
||||
def test_does_not_duplicate_user_flag(self, blm_config: BLMConfig):
|
||||
mock_result = MagicMock(returncode=0, stdout="", stderr="")
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
run_blm_command(["ingest-cora", "-u", "other"], blm_config)
|
||||
cmd = mock_run.call_args[0][0]
|
||||
# -u should appear once, with the caller's value preserved
|
||||
assert cmd.count("-u") == 1
|
||||
assert cmd[cmd.index("-u") + 1] == "other"
|
||||
|
||||
def test_does_not_duplicate_password_flag(self, blm_config: BLMConfig):
|
||||
mock_result = MagicMock(returncode=0, stdout="", stderr="")
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
run_blm_command(["ingest-cora", "-p", "otherpw"], blm_config)
|
||||
cmd = mock_run.call_args[0][0]
|
||||
assert cmd.count("-p") == 1
|
||||
assert cmd[cmd.index("-p") + 1] == "otherpw"
|
||||
|
||||
def test_skips_credentials_when_not_configured(self, tmp_path: Path):
|
||||
blm_dir = tmp_path / "blm"
|
||||
blm_dir.mkdir()
|
||||
config = BLMConfig(blm_dir=str(blm_dir)) # no user/pass
|
||||
mock_result = MagicMock(returncode=0, stdout="", stderr="")
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
run_blm_command(["ingest-cora"], config)
|
||||
cmd = mock_run.call_args[0][0]
|
||||
assert "-u" not in cmd
|
||||
assert "-p" not in cmd
|
||||
|
||||
def test_raises_on_missing_blm_dir(self, tmp_path: Path):
|
||||
config = BLMConfig(blm_dir=str(tmp_path / "nope"))
|
||||
with pytest.raises(FileNotFoundError):
|
||||
run_blm_command(["ingest-cora"], config)
|
||||
|
||||
def test_passes_timeout(self, blm_config: BLMConfig):
|
||||
mock_result = MagicMock(returncode=0, stdout="", stderr="")
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
run_blm_command(["ingest-cora"], blm_config)
|
||||
assert mock_run.call_args[1]["timeout"] == blm_config.timeout_seconds
|
||||
|
||||
def test_propagates_timeout_expired(self, blm_config: BLMConfig):
|
||||
with patch(
|
||||
"subprocess.run",
|
||||
side_effect=subprocess.TimeoutExpired(cmd="python", timeout=300),
|
||||
), pytest.raises(subprocess.TimeoutExpired):
|
||||
run_blm_command(["ingest-cora"], blm_config)
|
||||
|
||||
def test_custom_python_exe(self, tmp_path: Path):
|
||||
blm_dir = tmp_path / "blm"
|
||||
blm_dir.mkdir()
|
||||
config = BLMConfig(blm_dir=str(blm_dir), python_exe="/opt/venv/bin/python")
|
||||
mock_result = MagicMock(returncode=0, stdout="", stderr="")
|
||||
with patch("subprocess.run", return_value=mock_result) as mock_run:
|
||||
run_blm_command(["ingest-cora"], config)
|
||||
assert mock_run.call_args[0][0][0] == "/opt/venv/bin/python"
|
||||
|
|
@ -0,0 +1,170 @@
|
|||
"""Tests for inbox folder scanning and keyword-based file lookup."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from link_building_workflow.inbox import (
|
||||
find_all_xlsx_for_keyword,
|
||||
find_xlsx_for_keyword,
|
||||
list_inbox_xlsx,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def inbox(tmp_path: Path) -> Path:
|
||||
"""An empty inbox folder."""
|
||||
d = tmp_path / "cora-inbox"
|
||||
d.mkdir()
|
||||
return d
|
||||
|
||||
|
||||
def _touch(folder: Path, name: str) -> Path:
|
||||
p = folder / name
|
||||
p.write_bytes(b"fake xlsx")
|
||||
return p
|
||||
|
||||
|
||||
class TestListInboxXlsx:
|
||||
def test_missing_folder_returns_empty(self, tmp_path: Path):
|
||||
assert list_inbox_xlsx(tmp_path / "does-not-exist") == []
|
||||
|
||||
def test_empty_folder(self, inbox: Path):
|
||||
assert list_inbox_xlsx(inbox) == []
|
||||
|
||||
def test_lists_xlsx_only(self, inbox: Path):
|
||||
_touch(inbox, "a.xlsx")
|
||||
_touch(inbox, "readme.txt")
|
||||
_touch(inbox, "b.xlsx")
|
||||
result = list_inbox_xlsx(inbox)
|
||||
names = [p.name for p in result]
|
||||
assert names == ["a.xlsx", "b.xlsx"]
|
||||
|
||||
def test_skips_office_lock_files(self, inbox: Path):
|
||||
_touch(inbox, "real.xlsx")
|
||||
_touch(inbox, "~$real.xlsx")
|
||||
result = list_inbox_xlsx(inbox)
|
||||
assert [p.name for p in result] == ["real.xlsx"]
|
||||
|
||||
def test_skips_processed_by_default(self, inbox: Path):
|
||||
_touch(inbox, "new.xlsx")
|
||||
processed = inbox / "processed"
|
||||
processed.mkdir()
|
||||
_touch(processed, "old.xlsx")
|
||||
# Also duplicate the name in root to prove it gets filtered
|
||||
_touch(inbox, "old.xlsx")
|
||||
|
||||
result = list_inbox_xlsx(inbox)
|
||||
assert [p.name for p in result] == ["new.xlsx"]
|
||||
|
||||
def test_skip_processed_disabled(self, inbox: Path):
|
||||
_touch(inbox, "new.xlsx")
|
||||
processed = inbox / "processed"
|
||||
processed.mkdir()
|
||||
_touch(inbox, "old.xlsx") # same name as one we "processed"
|
||||
|
||||
result = list_inbox_xlsx(inbox, skip_processed=False)
|
||||
assert sorted(p.name for p in result) == ["new.xlsx", "old.xlsx"]
|
||||
|
||||
def test_sorted_output(self, inbox: Path):
|
||||
_touch(inbox, "c.xlsx")
|
||||
_touch(inbox, "a.xlsx")
|
||||
_touch(inbox, "b.xlsx")
|
||||
result = list_inbox_xlsx(inbox)
|
||||
assert [p.name for p in result] == ["a.xlsx", "b.xlsx", "c.xlsx"]
|
||||
|
||||
|
||||
class TestFindXlsxForKeyword:
|
||||
def test_exact_match(self, inbox: Path, llm_never):
|
||||
_touch(inbox, "precision-cnc-machining.xlsx")
|
||||
match = find_xlsx_for_keyword(inbox, "precision cnc machining", llm_never)
|
||||
assert match is not None
|
||||
assert match.filename == "precision-cnc-machining.xlsx"
|
||||
assert match.stem_keyword == "precision cnc machining"
|
||||
|
||||
def test_no_match(self, inbox: Path, llm_never):
|
||||
_touch(inbox, "other-keyword.xlsx")
|
||||
match = find_xlsx_for_keyword(inbox, "cnc machining", llm_never)
|
||||
assert match is None
|
||||
|
||||
def test_missing_folder(self, tmp_path: Path, llm_never):
|
||||
match = find_xlsx_for_keyword(
|
||||
tmp_path / "no-such-dir", "cnc machining", llm_never
|
||||
)
|
||||
assert match is None
|
||||
|
||||
def test_empty_keyword(self, inbox: Path, llm_never):
|
||||
_touch(inbox, "anything.xlsx")
|
||||
match = find_xlsx_for_keyword(inbox, "", llm_never)
|
||||
assert match is None
|
||||
|
||||
def test_keyword_with_hyphens(self, inbox: Path, llm_never):
|
||||
# Caller may pass the keyword in hyphenated form; should still match
|
||||
_touch(inbox, "precision-cnc-machining.xlsx")
|
||||
match = find_xlsx_for_keyword(
|
||||
inbox, "precision-cnc-machining", llm_never
|
||||
)
|
||||
assert match is not None
|
||||
|
||||
def test_keyword_case_insensitive(self, inbox: Path, llm_never):
|
||||
_touch(inbox, "cnc-machining.xlsx")
|
||||
match = find_xlsx_for_keyword(inbox, "CNC Machining", llm_never)
|
||||
assert match is not None
|
||||
|
||||
def test_plural_match_via_llm(self, inbox: Path):
|
||||
_touch(inbox, "cnc-shafts.xlsx")
|
||||
|
||||
def only_plural_of_shaft(a: str, b: str) -> bool:
|
||||
return {a, b} == {"cnc shaft", "cnc shafts"}
|
||||
|
||||
# Singular keyword should match the plural filename via LLM
|
||||
match = find_xlsx_for_keyword(inbox, "cnc shaft", only_plural_of_shaft)
|
||||
assert match is not None
|
||||
assert match.filename == "cnc-shafts.xlsx"
|
||||
|
||||
def test_first_match_returned(self, inbox: Path, llm_never):
|
||||
# Two xlsx files both match; sorted order picks "a..." first
|
||||
_touch(inbox, "b-cnc-machining.xlsx")
|
||||
_touch(inbox, "a-cnc-machining.xlsx")
|
||||
# These don't fuzzy match the keyword "cnc machining" because of
|
||||
# the a-/b- prefix. So use a real collision:
|
||||
_touch(inbox, "cnc-machining.xlsx")
|
||||
match = find_xlsx_for_keyword(inbox, "cnc machining", llm_never)
|
||||
assert match is not None
|
||||
assert match.filename == "cnc-machining.xlsx"
|
||||
|
||||
def test_processed_files_ignored(self, inbox: Path, llm_never):
|
||||
processed = inbox / "processed"
|
||||
processed.mkdir()
|
||||
_touch(processed, "cnc-machining.xlsx")
|
||||
_touch(inbox, "cnc-machining.xlsx")
|
||||
# Inbox file with same name as processed one is also skipped by
|
||||
# list_inbox_xlsx, so no match available
|
||||
match = find_xlsx_for_keyword(inbox, "cnc machining", llm_never)
|
||||
assert match is None
|
||||
|
||||
|
||||
class TestFindAllXlsxForKeyword:
|
||||
def test_returns_all_matches(self, inbox: Path):
|
||||
_touch(inbox, "cnc-shaft.xlsx")
|
||||
_touch(inbox, "cnc-shafts.xlsx")
|
||||
_touch(inbox, "unrelated.xlsx")
|
||||
|
||||
def plural_ok(a: str, b: str) -> bool:
|
||||
return {a, b} == {"cnc shaft", "cnc shafts"}
|
||||
|
||||
results = find_all_xlsx_for_keyword(inbox, "cnc shaft", plural_ok)
|
||||
names = sorted(r.filename for r in results)
|
||||
assert names == ["cnc-shaft.xlsx", "cnc-shafts.xlsx"]
|
||||
|
||||
def test_empty_when_no_matches(self, inbox: Path, llm_never):
|
||||
_touch(inbox, "unrelated.xlsx")
|
||||
results = find_all_xlsx_for_keyword(inbox, "cnc shaft", llm_never)
|
||||
assert results == []
|
||||
|
||||
def test_empty_keyword_returns_empty(self, inbox: Path, llm_never):
|
||||
_touch(inbox, "anything.xlsx")
|
||||
results = find_all_xlsx_for_keyword(inbox, "", llm_never)
|
||||
assert results == []
|
||||
|
|
@ -0,0 +1,97 @@
|
|||
"""Tests for keyword normalization and fuzzy matching."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from link_building_workflow.matching import (
|
||||
filename_stem_to_keyword,
|
||||
fuzzy_keyword_match,
|
||||
normalize_for_match,
|
||||
)
|
||||
|
||||
|
||||
class TestNormalizeForMatch:
|
||||
def test_lowercases(self):
|
||||
assert normalize_for_match("Hello World") == "hello world"
|
||||
|
||||
def test_strips_punctuation(self):
|
||||
assert normalize_for_match("hello, world!") == "hello world"
|
||||
|
||||
def test_collapses_whitespace(self):
|
||||
assert normalize_for_match("hello world\n\ttest") == "hello world test"
|
||||
|
||||
def test_empty_string(self):
|
||||
assert normalize_for_match("") == ""
|
||||
|
||||
def test_only_punctuation(self):
|
||||
assert normalize_for_match("!!!...,,,") == ""
|
||||
|
||||
def test_numbers_preserved(self):
|
||||
assert normalize_for_match("5-axis cnc") == "5 axis cnc"
|
||||
|
||||
def test_leading_trailing_whitespace(self):
|
||||
assert normalize_for_match(" hello world ") == "hello world"
|
||||
|
||||
|
||||
class TestFuzzyKeywordMatch:
|
||||
def test_exact_match(self, llm_never):
|
||||
assert fuzzy_keyword_match("cnc machining", "cnc machining", llm_never) is True
|
||||
|
||||
def test_different_no_llm(self):
|
||||
assert fuzzy_keyword_match("cnc", "cnc machining") is False
|
||||
|
||||
def test_different_llm_says_no(self, llm_never):
|
||||
assert fuzzy_keyword_match("cnc", "milling", llm_never) is False
|
||||
|
||||
def test_different_llm_says_yes(self, llm_always):
|
||||
# LLM callable gets to decide when exact match fails
|
||||
assert fuzzy_keyword_match("shaft", "shafts", llm_always) is True
|
||||
|
||||
def test_empty_a(self, llm_always):
|
||||
assert fuzzy_keyword_match("", "cnc", llm_always) is False
|
||||
|
||||
def test_empty_b(self, llm_always):
|
||||
assert fuzzy_keyword_match("cnc", "", llm_always) is False
|
||||
|
||||
def test_both_empty(self, llm_always):
|
||||
# Even with llm_always, empty inputs short-circuit to False
|
||||
assert fuzzy_keyword_match("", "", llm_always) is False
|
||||
|
||||
def test_no_llm_check_fast_path_hit(self):
|
||||
# When no llm_check provided, exact matches still work
|
||||
assert fuzzy_keyword_match("same", "same") is True
|
||||
|
||||
def test_no_llm_check_fast_path_miss(self):
|
||||
# When no llm_check and not exact, returns False
|
||||
assert fuzzy_keyword_match("same", "different") is False
|
||||
|
||||
def test_llm_check_only_called_when_needed(self):
|
||||
calls = []
|
||||
|
||||
def tracking_llm(a, b):
|
||||
calls.append((a, b))
|
||||
return True
|
||||
|
||||
# Exact match: LLM should not be called
|
||||
fuzzy_keyword_match("cnc", "cnc", tracking_llm)
|
||||
assert calls == []
|
||||
|
||||
# Different: LLM should be called once
|
||||
fuzzy_keyword_match("shaft", "shafts", tracking_llm)
|
||||
assert calls == [("shaft", "shafts")]
|
||||
|
||||
|
||||
class TestFilenameStemToKeyword:
|
||||
def test_hyphens_to_spaces(self):
|
||||
assert filename_stem_to_keyword("precision-cnc-machining") == "precision cnc machining"
|
||||
|
||||
def test_underscores_to_spaces(self):
|
||||
assert filename_stem_to_keyword("precision_cnc_machining") == "precision cnc machining"
|
||||
|
||||
def test_mixed_separators(self):
|
||||
assert filename_stem_to_keyword("precision-cnc_machining") == "precision cnc machining"
|
||||
|
||||
def test_uppercase(self):
|
||||
assert filename_stem_to_keyword("CNC-Machining") == "cnc machining"
|
||||
|
||||
def test_empty(self):
|
||||
assert filename_stem_to_keyword("") == ""
|
||||
|
|
@ -0,0 +1,460 @@
|
|||
"""Tests for the three pipeline entry points.
|
||||
|
||||
BLM subprocess calls are mocked via `link_building_workflow.blm.run_blm_command`.
|
||||
The pipeline module imports blm as `blm_mod` and calls `blm_mod.run_blm_command(...)`,
|
||||
so we patch there.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from link_building_workflow import (
|
||||
BLMConfig,
|
||||
Deps,
|
||||
blm_generate_batch,
|
||||
blm_ingest_cora,
|
||||
run_cora_backlinks,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def xlsx_file(tmp_path: Path) -> Path:
|
||||
p = tmp_path / "precision-cnc-machining.xlsx"
|
||||
p.write_bytes(b"fake xlsx")
|
||||
return p
|
||||
|
||||
|
||||
def _mock_proc(stdout: str = "", stderr: str = "", returncode: int = 0) -> MagicMock:
|
||||
m = MagicMock()
|
||||
m.stdout = stdout
|
||||
m.stderr = stderr
|
||||
m.returncode = returncode
|
||||
return m
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# blm_ingest_cora
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBlmIngestCora:
|
||||
def test_missing_xlsx_path(self, deps: Deps):
|
||||
result = blm_ingest_cora(xlsx_path="", project_name="P", deps=deps)
|
||||
assert result.ok is False
|
||||
assert "xlsx_path is required" in result.error
|
||||
|
||||
def test_missing_project_name(self, deps: Deps, xlsx_file: Path):
|
||||
result = blm_ingest_cora(
|
||||
xlsx_path=str(xlsx_file), project_name="", deps=deps
|
||||
)
|
||||
assert result.ok is False
|
||||
assert "project_name is required" in result.error
|
||||
|
||||
def test_xlsx_not_found(self, deps: Deps):
|
||||
result = blm_ingest_cora(
|
||||
xlsx_path="/nope/missing.xlsx", project_name="P", deps=deps
|
||||
)
|
||||
assert result.ok is False
|
||||
assert "not found" in result.error
|
||||
|
||||
def test_success(
|
||||
self, deps: Deps, xlsx_file: Path, ingest_success_stdout: str
|
||||
):
|
||||
proc = _mock_proc(stdout=ingest_success_stdout)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
):
|
||||
result = blm_ingest_cora(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="Test Project",
|
||||
deps=deps,
|
||||
)
|
||||
assert result.ok is True
|
||||
assert result.step == "ingest"
|
||||
assert result.ingest is not None
|
||||
assert result.ingest.project_id == "42"
|
||||
assert result.ingest.job_file == "jobs/test-project.json"
|
||||
assert result.job_file == "jobs/test-project.json"
|
||||
assert result.project_name == "Test Project"
|
||||
assert "CORA ingest complete" in result.summary
|
||||
|
||||
def test_nonzero_exit_reports_failure(
|
||||
self, deps: Deps, xlsx_file: Path
|
||||
):
|
||||
proc = _mock_proc(stdout="", stderr="boom", returncode=1)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
):
|
||||
result = blm_ingest_cora(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="P",
|
||||
deps=deps,
|
||||
)
|
||||
assert result.ok is False
|
||||
assert "exit=1" in result.error
|
||||
assert "boom" in result.error
|
||||
|
||||
def test_timeout(self, deps: Deps, xlsx_file: Path):
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command",
|
||||
side_effect=subprocess.TimeoutExpired(cmd="python", timeout=300),
|
||||
):
|
||||
result = blm_ingest_cora(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="P",
|
||||
deps=deps,
|
||||
)
|
||||
assert result.ok is False
|
||||
assert "timed out" in result.error
|
||||
|
||||
def test_uses_config_default_branded_plus_ratio(
|
||||
self, deps: Deps, xlsx_file: Path, ingest_success_stdout: str
|
||||
):
|
||||
# Caller passes None, so Deps default (0.7) should be used
|
||||
proc = _mock_proc(stdout=ingest_success_stdout)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
) as mock_run:
|
||||
blm_ingest_cora(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="P",
|
||||
deps=deps,
|
||||
branded_plus_ratio=None,
|
||||
)
|
||||
args = mock_run.call_args[0][0]
|
||||
# 0.7 is the default, so -bp should NOT appear in args
|
||||
assert "-bp" not in args
|
||||
|
||||
def test_caller_override_branded_plus_ratio(
|
||||
self, deps: Deps, xlsx_file: Path, ingest_success_stdout: str
|
||||
):
|
||||
proc = _mock_proc(stdout=ingest_success_stdout)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
) as mock_run:
|
||||
blm_ingest_cora(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="P",
|
||||
deps=deps,
|
||||
branded_plus_ratio=0.85,
|
||||
)
|
||||
args = mock_run.call_args[0][0]
|
||||
assert "-bp" in args
|
||||
assert args[args.index("-bp") + 1] == "0.85"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# blm_generate_batch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBlmGenerateBatch:
|
||||
def test_missing_job_file_arg(self, deps: Deps):
|
||||
result = blm_generate_batch(job_file="", deps=deps)
|
||||
assert result.ok is False
|
||||
assert "job_file is required" in result.error
|
||||
|
||||
def test_job_file_does_not_exist(self, deps: Deps):
|
||||
result = blm_generate_batch(
|
||||
job_file="/definitely/not/here.json", deps=deps
|
||||
)
|
||||
assert result.ok is False
|
||||
assert "not found" in result.error
|
||||
|
||||
def test_relative_path_resolved_against_blm_dir(
|
||||
self, deps: Deps, generate_success_stdout: str
|
||||
):
|
||||
# Create a relative job file under the fake BLM dir
|
||||
job_rel = "jobs/x.json"
|
||||
(Path(deps.blm.blm_dir) / "jobs").mkdir()
|
||||
(Path(deps.blm.blm_dir) / job_rel).write_text("{}")
|
||||
|
||||
proc = _mock_proc(stdout=generate_success_stdout)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
) as mock_run:
|
||||
result = blm_generate_batch(job_file=job_rel, deps=deps)
|
||||
assert result.ok is True
|
||||
|
||||
# The resolved absolute path should have been passed to BLM
|
||||
args = mock_run.call_args[0][0]
|
||||
j_index = args.index("-j")
|
||||
passed_path = args[j_index + 1]
|
||||
assert passed_path.endswith("x.json")
|
||||
assert Path(passed_path).is_absolute()
|
||||
|
||||
def test_continue_on_error_flag_default(
|
||||
self, deps: Deps, tmp_path: Path, generate_success_stdout: str
|
||||
):
|
||||
job = tmp_path / "job.json"
|
||||
job.write_text("{}")
|
||||
|
||||
proc = _mock_proc(stdout=generate_success_stdout)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
) as mock_run:
|
||||
blm_generate_batch(job_file=str(job), deps=deps)
|
||||
args = mock_run.call_args[0][0]
|
||||
assert "--continue-on-error" in args
|
||||
|
||||
def test_continue_on_error_disabled(
|
||||
self, deps: Deps, tmp_path: Path, generate_success_stdout: str
|
||||
):
|
||||
job = tmp_path / "job.json"
|
||||
job.write_text("{}")
|
||||
|
||||
proc = _mock_proc(stdout=generate_success_stdout)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
) as mock_run:
|
||||
blm_generate_batch(
|
||||
job_file=str(job), deps=deps, continue_on_error=False
|
||||
)
|
||||
args = mock_run.call_args[0][0]
|
||||
assert "--continue-on-error" not in args
|
||||
|
||||
def test_debug_flag(self, deps: Deps, tmp_path: Path, generate_success_stdout: str):
|
||||
job = tmp_path / "job.json"
|
||||
job.write_text("{}")
|
||||
proc = _mock_proc(stdout=generate_success_stdout)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
) as mock_run:
|
||||
blm_generate_batch(job_file=str(job), deps=deps, debug=True)
|
||||
assert "--debug" in mock_run.call_args[0][0]
|
||||
|
||||
def test_nonzero_exit(self, deps: Deps, tmp_path: Path):
|
||||
job = tmp_path / "job.json"
|
||||
job.write_text("{}")
|
||||
|
||||
proc = _mock_proc(stdout="", stderr="fail", returncode=2)
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", return_value=proc
|
||||
):
|
||||
result = blm_generate_batch(job_file=str(job), deps=deps)
|
||||
assert result.ok is False
|
||||
assert "exit=2" in result.error
|
||||
|
||||
def test_timeout(self, deps: Deps, tmp_path: Path):
|
||||
job = tmp_path / "job.json"
|
||||
job.write_text("{}")
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command",
|
||||
side_effect=subprocess.TimeoutExpired(cmd="python", timeout=300),
|
||||
):
|
||||
result = blm_generate_batch(job_file=str(job), deps=deps)
|
||||
assert result.ok is False
|
||||
assert "timed out" in result.error
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_cora_backlinks (full pipeline)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRunCoraBacklinks:
|
||||
def test_missing_money_site_url(self, deps: Deps, xlsx_file: Path):
|
||||
result = run_cora_backlinks(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="P",
|
||||
money_site_url="",
|
||||
deps=deps,
|
||||
)
|
||||
assert result.ok is False
|
||||
assert "IMSURL" in result.error
|
||||
|
||||
def test_full_success(
|
||||
self,
|
||||
deps: Deps,
|
||||
xlsx_file: Path,
|
||||
ingest_success_stdout: str,
|
||||
generate_success_stdout: str,
|
||||
):
|
||||
# ingest stdout must reference a job file that then exists on disk
|
||||
# for blm_generate_batch's existence check to pass.
|
||||
job_rel = "jobs/test-project.json"
|
||||
(Path(deps.blm.blm_dir) / "jobs").mkdir()
|
||||
(Path(deps.blm.blm_dir) / job_rel).write_text("{}")
|
||||
|
||||
procs = [
|
||||
_mock_proc(stdout=ingest_success_stdout),
|
||||
_mock_proc(stdout=generate_success_stdout),
|
||||
]
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", side_effect=procs
|
||||
) as mock_run:
|
||||
result = run_cora_backlinks(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="Test Project",
|
||||
money_site_url="https://example.com",
|
||||
deps=deps,
|
||||
)
|
||||
|
||||
assert result.ok is True
|
||||
assert result.step == "complete"
|
||||
assert result.ingest is not None
|
||||
assert result.generate is not None
|
||||
assert result.ingest.project_id == "42"
|
||||
assert result.generate.job_moved_to == "jobs/done/test-project.json"
|
||||
assert result.job_file == "jobs/done/test-project.json"
|
||||
assert "Step 1" in result.summary and "Step 2" in result.summary
|
||||
|
||||
# BLM was invoked twice (ingest, generate)
|
||||
assert mock_run.call_count == 2
|
||||
ingest_args = mock_run.call_args_list[0][0][0]
|
||||
generate_args = mock_run.call_args_list[1][0][0]
|
||||
assert "ingest-cora" in ingest_args
|
||||
assert "generate-batch" in generate_args
|
||||
|
||||
def test_ingest_failure_skips_generate(
|
||||
self, deps: Deps, xlsx_file: Path
|
||||
):
|
||||
procs = [_mock_proc(stdout="", stderr="fail", returncode=1)]
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", side_effect=procs
|
||||
) as mock_run:
|
||||
result = run_cora_backlinks(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="P",
|
||||
money_site_url="https://example.com",
|
||||
deps=deps,
|
||||
)
|
||||
assert result.ok is False
|
||||
assert result.step == "ingest"
|
||||
assert mock_run.call_count == 1 # generate not called
|
||||
|
||||
def test_generate_failure_preserves_ingest(
|
||||
self, deps: Deps, xlsx_file: Path, ingest_success_stdout: str
|
||||
):
|
||||
job_rel = "jobs/test-project.json"
|
||||
(Path(deps.blm.blm_dir) / "jobs").mkdir()
|
||||
(Path(deps.blm.blm_dir) / job_rel).write_text("{}")
|
||||
|
||||
procs = [
|
||||
_mock_proc(stdout=ingest_success_stdout),
|
||||
_mock_proc(stdout="", stderr="gen fail", returncode=3),
|
||||
]
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", side_effect=procs
|
||||
):
|
||||
result = run_cora_backlinks(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="Test Project",
|
||||
money_site_url="https://example.com",
|
||||
deps=deps,
|
||||
)
|
||||
assert result.ok is False
|
||||
assert result.step == "generate"
|
||||
# Ingest succeeded; its data is still on the result
|
||||
assert result.ingest is not None
|
||||
assert result.ingest.project_id == "42"
|
||||
assert "gen fail" in result.error
|
||||
|
||||
def test_on_progress_callback_invoked(
|
||||
self,
|
||||
deps: Deps,
|
||||
xlsx_file: Path,
|
||||
ingest_success_stdout: str,
|
||||
generate_success_stdout: str,
|
||||
):
|
||||
job_rel = "jobs/test-project.json"
|
||||
(Path(deps.blm.blm_dir) / "jobs").mkdir()
|
||||
(Path(deps.blm.blm_dir) / job_rel).write_text("{}")
|
||||
|
||||
progress_calls: list[str] = []
|
||||
procs = [
|
||||
_mock_proc(stdout=ingest_success_stdout),
|
||||
_mock_proc(stdout=generate_success_stdout),
|
||||
]
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", side_effect=procs
|
||||
):
|
||||
result = run_cora_backlinks(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="Test Project",
|
||||
money_site_url="https://example.com",
|
||||
deps=deps,
|
||||
on_progress=progress_calls.append,
|
||||
)
|
||||
assert result.ok is True
|
||||
assert len(progress_calls) >= 2
|
||||
assert any("Step 1" in m for m in progress_calls)
|
||||
assert any("Step 2" in m for m in progress_calls)
|
||||
# log_lines mirrors progress_calls
|
||||
assert result.log_lines == progress_calls
|
||||
|
||||
def test_on_progress_exception_does_not_break_pipeline(
|
||||
self,
|
||||
deps: Deps,
|
||||
xlsx_file: Path,
|
||||
ingest_success_stdout: str,
|
||||
generate_success_stdout: str,
|
||||
):
|
||||
job_rel = "jobs/test-project.json"
|
||||
(Path(deps.blm.blm_dir) / "jobs").mkdir()
|
||||
(Path(deps.blm.blm_dir) / job_rel).write_text("{}")
|
||||
|
||||
def broken(_msg: str) -> None:
|
||||
raise RuntimeError("progress callback failed")
|
||||
|
||||
procs = [
|
||||
_mock_proc(stdout=ingest_success_stdout),
|
||||
_mock_proc(stdout=generate_success_stdout),
|
||||
]
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", side_effect=procs
|
||||
):
|
||||
result = run_cora_backlinks(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="Test Project",
|
||||
money_site_url="https://example.com",
|
||||
deps=deps,
|
||||
on_progress=broken,
|
||||
)
|
||||
# Pipeline still completed successfully despite broken callback
|
||||
assert result.ok is True
|
||||
|
||||
def test_uses_config_default_ratio_when_none(
|
||||
self,
|
||||
deps: Deps,
|
||||
xlsx_file: Path,
|
||||
ingest_success_stdout: str,
|
||||
generate_success_stdout: str,
|
||||
):
|
||||
# Verify the Deps-level default flows into build_ingest_args
|
||||
blm_cfg = BLMConfig(
|
||||
blm_dir=deps.blm.blm_dir,
|
||||
username=deps.blm.username,
|
||||
password=deps.blm.password,
|
||||
timeout_seconds=deps.blm.timeout_seconds,
|
||||
default_branded_plus_ratio=0.9, # non-default
|
||||
python_exe=deps.blm.python_exe,
|
||||
)
|
||||
new_deps = Deps(blm=blm_cfg, llm_check=deps.llm_check)
|
||||
|
||||
job_rel = "jobs/test-project.json"
|
||||
(Path(blm_cfg.blm_dir) / "jobs").mkdir()
|
||||
(Path(blm_cfg.blm_dir) / job_rel).write_text("{}")
|
||||
|
||||
procs = [
|
||||
_mock_proc(stdout=ingest_success_stdout),
|
||||
_mock_proc(stdout=generate_success_stdout),
|
||||
]
|
||||
with patch(
|
||||
"link_building_workflow.blm.run_blm_command", side_effect=procs
|
||||
) as mock_run:
|
||||
run_cora_backlinks(
|
||||
xlsx_path=str(xlsx_file),
|
||||
project_name="Test Project",
|
||||
money_site_url="https://example.com",
|
||||
deps=new_deps,
|
||||
branded_plus_ratio=None, # should pick up 0.9 default
|
||||
)
|
||||
ingest_args = mock_run.call_args_list[0][0][0]
|
||||
assert "-bp" in ingest_args
|
||||
assert ingest_args[ingest_args.index("-bp") + 1] == "0.9"
|
||||
Loading…
Reference in New Issue