Filter Cora distribution to eligible statuses and reduce poll interval to 40m

- Add _CORA_ELIGIBLE_STATUSES filter so only "running cora" and "error" tasks
  get matched during xlsx distribution (prevents accidental "to do" matches)
- Reduce watch_interval_minutes from 60 to 40 for faster Cora file pickup
- Add .txt output to test block generator
- Include docs, scripts, and reference files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
master
PeninsulaInd 2026-03-18 12:29:02 -05:00
parent ad7094e8a5
commit 5cb15756fc
21 changed files with 2560 additions and 1 deletions

View File

@ -0,0 +1,160 @@
# Brand Voice & Tone Guidelines
Reference for maintaining consistent voice across all written content. These are defaults — override with client-specific guidelines when available.
---
## Voice Archetypes
Start with Expert but also work in Guide when appliciable.
### Expert
- **Sounds like:** A senior practitioner sharing hard-won knowledge.
- **Characteristics:** Precise, evidence-backed, confident without arrogance. Cites data, references real-world experience, and isn't afraid to say "it depends."
- **Typical vocabulary:** "In practice," "the tradeoff is," "based on our benchmarks," "here's why this matters."
- **Risk to avoid:** Coming across as condescending or overly academic.
- **Best for:** Technical audiences, B2B SaaS, engineering blogs, whitepapers.
### Guide
- **Sounds like:** A patient teacher walking you through something step by step.
- **Characteristics:** Clear, encouraging, anticipates confusion. Breaks complex ideas into digestible pieces. Uses analogies.
- **Typical vocabulary:** "Let's start with," "think of it like," "the key thing to remember," "don't worry if this seems complex."
- **Risk to avoid:** Being patronizing or oversimplifying for an advanced audience.
- **Best for:** Tutorials, onboarding content, documentation, beginner-to-intermediate audiences.
---
## Core Writing Principles
These apply regardless of archetype.
### 1. Clarity First
- If a sentence can be misread, rewrite it.
- Use the simplest word that conveys the precise meaning. "Use" over "utilize." "Start" over "commence."
- One idea per paragraph. One purpose per section.
- Define jargon on first use, or skip it entirely.
### 2. Customer-Centric
- Frame everything from the reader's perspective, not the company's.
- **Instead of:** "We built a new feature that enables real-time collaboration."
- **Write:** "You can now edit documents with your team in real time."
- Lead with the reader's problem or goal, not the product or solution.
### 3. Active Voice
- Active voice is the default. Passive voice is acceptable only when the actor is unknown or irrelevant.
- **Active:** "The script generates a report every morning."
- **Passive (acceptable):** "The logs are rotated every 24 hours." (The actor doesn't matter.)
- **Passive (avoid):** "A decision was made to deprecate the endpoint." (Who decided?)
### 4. Show, Don't Claim
- Replace vague claims with specific evidence.
- **Claim:** "Our platform is incredibly fast."
- **Show:** "Queries return in under 50ms at the 99th percentile."
- If you can't provide evidence, soften the language or cut the sentence.
---
## Tone Attributes
Tone shifts based on content type and audience. Use these spectrums to calibrate.
### Formality Spectrum
```
Casual -------|-------|-------|-------|------- Formal
1 2 3 4 5
```
| Level | Description | Use When |
|-------|-------------|----------|
| 1 | Slang OK, sentence fragments, first person | Internal team comms, very informal blogs |
| 2 | Conversational, contractions, direct address | Newsletters, community posts, most blog content |
| 3 | Professional but approachable, minimal contractions | Product announcements, mid-funnel content |
| 4 | Polished, structured, no contractions | Whitepapers, enterprise case studies, executive briefs |
| 5 | Formal, third person, precise terminology | Legal, compliance, academic partnerships |
**Default for most blog/article content: Level 2-3.**
### Technical Depth Spectrum
```
General -------|-------|-------|-------|------- Deep Technical
1 2 3 4 5
```
| Level | Description | Use When |
|-------|-------------|----------|
| 1 | No jargon, analogy-heavy, conceptual | Non-technical stakeholders, general audience |
| 2 | Light jargon (defined inline), practical focus | Business audience with some domain familiarity |
| 3 | Industry-standard terminology, code snippets OK | Practitioners who do the work daily |
| 4 | Assumes working knowledge, implementation details | Developers, engineers, technical decision-makers |
| 5 | Deep internals, performance analysis, tradeoff math | Senior engineers, architects, researchers |
**Default: Match the audience. When unsure, aim at what you think the audience can handle. We are mostly B2B.**
---
## Language Preferences
### Use Action Verbs
Lead sentences — especially headings and CTAs — with strong verbs.
| Weak | Strong |
|------|--------|
| There is a way to improve | Improve |
| This section is a discussion of | This section covers |
| You should consider using | Use |
| It is important to note that | Note: |
| We are going to walk through | Let's walk through |
### Be Concrete and Specific
Vague language erodes trust. Replace generalities with specifics.
| Vague | Concrete |
|-------|----------|
| "significantly faster" | "3x faster" or "reduced from 12s to 2s" |
| "a large number of users" | "over 40,000 monthly active users" |
| "best-in-class" | describe the specific advantage |
| "seamless integration" | "connects via a single API call" |
| "in the near future" | "by Q2" or "in the next release" |
### Avoid These Patterns
- **Weasel words:** "very," "really," "extremely," "quite," "somewhat" — cut them or replace with data.
- **Nominalizations:** "implementation" when you mean "implement," "utilization" when you mean "use."
- **Hedge stacking:** "It might potentially be possible to perhaps consider..." — commit to a position or state the uncertainty once, clearly.
- **Buzzword chains:** "AI-powered next-gen synergistic platform" — describe what it actually does.
---
## Pre-Publication Checklist
Run through this before publishing any piece of content.
### Voice Consistency
- [ ] Does the piece sound like one person wrote it, beginning to end?
- [ ] Does it match the target voice archetype?
- [ ] Are there jarring shifts in tone between sections?
### Clarity
- [ ] Can a reader in the target audience understand every sentence on the first read?
- [ ] Is jargon defined or avoided?
- [ ] Are all acronyms expanded on first use?
- [ ] Do headings accurately describe the content beneath them?
- [ ] Is the article scannable? (subheadings every 2-4 paragraphs, short paragraphs, lists where appropriate)
### Value
- [ ] Does the introduction make clear what the reader will gain?
- [ ] Does every section earn its place? (Cut anything that doesn't serve the reader's goal.)
- [ ] Are claims supported by evidence, examples, or data?
- [ ] Is the advice actionable — can the reader do something with it today?
- [ ] Does the conclusion provide a clear next step?
### Formatting
- [ ] Title includes the core keyword or topic and at least 2 closely related keyword's/topics.
- [ ] Meta description summarizes the value proposition.
- [ ] Code blocks, tables, and images have context (a sentence before them explaining what the reader is looking at).
- [ ] Links use descriptive anchor text, not "click here."
- [ ] No walls of text — maximum 5 sentences per paragraph for web content. Use a minimum of 2 sentences.

View File

@ -433,6 +433,7 @@ def main():
md_path = out_dir / "test_block.md"
html_path = out_dir / "test_block.html"
txt_path = out_dir / "test_block.txt"
stats_path = out_dir / "test_block_stats.json"
md_content = format_markdown(result["sentences"])
@ -440,6 +441,7 @@ def main():
md_path.write_text(md_content, encoding="utf-8")
html_path.write_text(html_content, encoding="utf-8")
txt_path.write_text(html_content, encoding="utf-8")
stats_path.write_text(
json.dumps(result["stats"], indent=2, default=str), encoding="utf-8"
)
@ -459,6 +461,7 @@ def main():
print(f"\nFiles written:")
print(f" {md_path}")
print(f" {html_path}")
print(f" {txt_path}")
print(f" {stats_path}")

View File

@ -25,6 +25,11 @@ log = logging.getLogger(__name__)
HEARTBEAT_OK = "HEARTBEAT_OK"
# Only tasks in these statuses are eligible for xlsx → ClickUp matching.
# "to do" is excluded to prevent accidental matches and AutoCora race conditions.
# To force-reuse an xlsx for a "to do" task, set status to "running cora" first.
_CORA_ELIGIBLE_STATUSES = frozenset({"running cora", "error"})
class Scheduler:
# Tasks due within this window are eligible for execution
@ -837,6 +842,8 @@ class Scheduler:
return None
for task in tasks:
if task.status not in _CORA_ELIGIBLE_STATUSES:
continue
if task.task_type != "Link Building":
continue
@ -1000,6 +1007,8 @@ class Scheduler:
content_types = ("Content Creation", "On Page Optimization")
for task in tasks:
if task.status not in _CORA_ELIGIBLE_STATUSES:
continue
if task.task_type not in content_types:
continue
@ -1087,6 +1096,8 @@ class Scheduler:
matched_names = []
for task in tasks:
if task.status not in _CORA_ELIGIBLE_STATUSES:
continue
keyword = task.custom_fields.get("Keyword", "")
if not keyword:
continue

View File

@ -99,7 +99,7 @@ clickup:
link_building:
blm_dir: "E:/dev/Big-Link-Man"
watch_folder: "//PennQnap1/SHARE1/cora-inbox"
watch_interval_minutes: 60
watch_interval_minutes: 40
default_branded_plus_ratio: 0.7
# AutoCora job submission

287
cora-link.md 100644
View File

@ -0,0 +1,287 @@
# Link Building Agent Plan
## Context
CheddahBot needs a link building agent that orchestrates the external Big-Link-Man CLI tool (`E:/dev/Big-Link-Man/`). The current workflow is manual: run Cora on another machine → get .xlsx → manually run `main.py ingest-cora` → manually run `main.py generate-batch`. This agent automates steps 2 and 3, triggered by folder watching, ClickUp tasks, or chat commands. It must be expandable for future link building methods (MCP server path, ingest-simple, etc.).
## Decisions Made
- **Watch folder**: `Z:/cora-inbox` (network drive, Cora machine accessible)
- **File→task matching**: Fuzzy match .xlsx filename stem against ClickUp task's `Keyword` custom field
- **New ClickUp field "LB Method"**: Dropdown with initial option "Cora Backlinks" (more added later)
- **Dashboard**: API endpoint + NotificationBus events only (no frontend work — separate project)
- **Sidecar files**: Not needed — all metadata comes from the matching ClickUp task
- **Tool naming**: Orchestrator pattern — `run_link_building` is a thin dispatcher that reads `LB Method` and routes to the specific pipeline tool (e.g., `run_cora_backlinks`). Future link building methods get their own tools and slot into the orchestrator.
## Files to Create
### 1. `cheddahbot/tools/linkbuilding.py` — Main tool module
Four `@tool`-decorated functions + private helpers:
**`run_link_building(lb_method="", xlsx_path="", project_name="", money_site_url="", branded_plus_ratio=0.7, custom_anchors="", cli_flags="", ctx=None)`**
- **Orchestrator/dispatcher** — reads `lb_method` (from ClickUp "LB Method" field or chat) and routes to the correct pipeline tool
- If `lb_method` is "Cora Backlinks" or empty (default): calls `run_cora_backlinks()`
- Future: if `lb_method` is "MCP Link Building": calls `run_mcp_link_building()` (not yet implemented)
- Passes all other args through to the sub-tool
- This is what the ClickUp skill_map always routes to
**`run_cora_backlinks(xlsx_path, project_name, money_site_url, branded_plus_ratio=0.7, custom_anchors="", cli_flags="", ctx=None)`**
- The actual Cora pipeline — runs ingest-cora → generate-batch
- Step 1: Build CLI args, call `_run_blm_command(["ingest-cora", ...])`, parse stdout for job file path
- Step 2: Call `_run_blm_command(["generate-batch", "-j", job_file, "--continue-on-error"])`
- Updates KV store state and posts ClickUp comments at each step (following press_release.py pattern)
- Returns `## ClickUp Sync` in output to signal scheduler that sync was handled internally
- Can also be called directly from chat for explicit Cora runs
**`blm_ingest_cora(xlsx_path, project_name, money_site_url, branded_plus_ratio=0.7, custom_anchors="", cli_flags="", ctx=None)`**
- Standalone ingest — runs ingest-cora only, returns project ID and job file path
- For cases where user wants to ingest but not generate yet
**`blm_generate_batch(job_file, continue_on_error=True, debug=False, ctx=None)`**
- Standalone generate — runs generate-batch only on an existing job file
- For re-running generation or running a manually-created job
**Private helpers:**
- `_run_blm_command(args, timeout=1800)` — subprocess wrapper, runs `uv run python main.py <args>` from BLM_DIR, injects `-u`/`-p` from `BLM_USERNAME`/`BLM_PASSWORD` env vars
- `_parse_ingest_output(stdout)` — regex extract project_id + job_file path
- `_parse_generate_output(stdout)` — extract completion stats
- `_build_ingest_args(...)` — construct CLI argument list from tool params
- `_set_status(ctx, message)` — write pipeline status to KV store (for UI polling)
- `_sync_clickup(ctx, task_id, step, message)` — post comment + update state
**Critical: always pass `-m` flag** to ingest-cora to prevent interactive stdin prompt from blocking the subprocess.
### 2. `skills/linkbuilding.md` — Skill file
YAML frontmatter linking to `[run_link_building, run_cora_backlinks, blm_ingest_cora, blm_generate_batch, scan_cora_folder]` tools and `[link_builder, default]` agents. Markdown body describes when to use, default flags, workflow steps.
### 3. `tests/test_linkbuilding.py` — Test suite (~40 tests)
All tests mock `subprocess.run` — never call Big-Link-Man. Categories:
- Output parser unit tests (`_parse_ingest_output`, `_parse_generate_output`)
- CLI arg builder tests (all flag combinations, missing required params)
- Full pipeline integration (happy path, ingest failure, generate failure)
- ClickUp state machine (executing → completed, executing → failed)
- Folder watcher scan logic (new files, skip processed, missing ClickUp match)
## Files to Modify
### 4. `cheddahbot/config.py` — Add LinkBuildingConfig
```python
@dataclass
class LinkBuildingConfig:
blm_dir: str = "E:/dev/Big-Link-Man"
watch_folder: str = "" # empty = disabled
watch_interval_minutes: int = 60
default_branded_plus_ratio: float = 0.7
```
Add `link_building: LinkBuildingConfig` field to `Config` dataclass. Add YAML loading block in `load_config()` (same pattern as memory/scheduler/shell). Add env var override for `BLM_DIR`.
### 5. `config.yaml` — Three additions
**New top-level section:**
```yaml
link_building:
blm_dir: "E:/dev/Big-Link-Man"
watch_folder: "Z:/cora-inbox"
watch_interval_minutes: 60
default_branded_plus_ratio: 0.7
```
**New skill_map entry under clickup:**
```yaml
"Link Building":
tool: "run_link_building"
auto_execute: false # Cora Backlinks triggered by folder watcher, not scheduler
complete_status: "complete" # Override: use "complete" instead of "internal review"
error_status: "internal review" # On failure, move to internal review
field_mapping:
lb_method: "LB Method"
project_name: "task_name"
money_site_url: "IMSURL"
custom_anchors: "CustomAnchors"
branded_plus_ratio: "BrandedPlusRatio"
cli_flags: "CLIFlags"
xlsx_path: "CoraFile"
```
**New agent:**
```yaml
- name: link_builder
display_name: Link Builder
tools: [run_link_building, run_cora_backlinks, blm_ingest_cora, blm_generate_batch, scan_cora_folder, delegate_task, remember, search_memory]
memory_scope: ""
```
### 6. `cheddahbot/scheduler.py` — Add folder watcher (4th daemon thread)
**New thread `_folder_watch_loop`** alongside existing poll, heartbeat, and ClickUp threads:
- Starts if `config.link_building.watch_folder` is non-empty
- Runs every `watch_interval_minutes` (default 60)
- `_scan_watch_folder()` globs `*.xlsx` in watch folder
- For each file, checks KV store `linkbuilding:watched:{filename}` — skip if already processed
- **Fuzzy-matches filename stem against ClickUp tasks** with `LB Method = "Cora Backlinks"` and status "to do":
- Queries ClickUp for Link Building tasks
- Compares normalized filename stem against each task's `Keyword` custom field
- If match found: extracts money_site_url from IMSURL field, cli_flags from CLIFlags field, etc.
- If no match: logs warning, marks as "unmatched" in KV store, sends notification asking user to create/link a ClickUp task
- On match: executes `run_link_building` tool with args from the ClickUp task fields
- On completion: moves .xlsx to `Z:/cora-inbox/processed/` subfolder, updates KV state
- On failure: updates KV state with error, notifies via NotificationBus
**File handling after pipeline:**
- On success: .xlsx moved from `Z:/cora-inbox/``Z:/cora-inbox/processed/`
- On failure: .xlsx stays in `Z:/cora-inbox/` (KV store marks it as failed so watcher doesn't retry automatically; user can reset KV entry to retry)
**Also adds `scan_cora_folder` tool** (can live in linkbuilding.py):
- Chat-invocable utility for the agent to check what's in the watch folder
- Returns list of unprocessed .xlsx files with ClickUp match status
- Internal agent tool, not a dashboard concern
### 7. `cheddahbot/clickup.py` — Add field creation method
Add `create_custom_field(list_id, name, field_type, type_config=None)` method that calls `POST /list/{list_id}/field`. Used by the setup tool to auto-create custom fields across lists.
### 8. `cheddahbot/__main__.py` — Add API endpoint
Add before Gradio mount:
```python
@fastapi_app.get("/api/linkbuilding/status")
async def linkbuilding_status():
"""Return link building status for dashboard consumption."""
# Returns:
# {
# "pending_cora_runs": [
# {"keyword": "precision cnc machining", "url": "https://...", "client": "Chapter 2", "task_id": "abc123"},
# ...
# ],
# "in_progress": [...], # Currently executing pipelines
# "completed": [...], # Recently completed (last 7 days)
# "failed": [...] # Failed tasks needing attention
# }
```
The `pending_cora_runs` section is the key dashboard data: queries ClickUp for "to do" tasks with Work Category="Link Building" and LB Method="Cora Backlinks", returns each task's `Keyword` field and `IMSURL` (copiable URL) so the user can see exactly which Cora reports need to be run.
Also push link building events to NotificationBus (category="linkbuilding") at each pipeline step for future real-time dashboard support.
No other `__main__.py` changes needed — agent wiring is automatic from config.yaml.
## ClickUp Custom Fields (Auto-Created)
New custom fields to be created programmatically:
| Field | Type | Purpose |
|-------|------|---------|
| `LB Method` | Dropdown | Link building subtype. Initial option: "Cora Backlinks" |
| `Keyword` | Short Text | Target keyword (used for file matching) |
| `CoraFile` | Short Text | Path to .xlsx file (optional, set by agent after file match) |
| `CustomAnchors` | Short Text | Comma-separated anchor text overrides |
| `BrandedPlusRatio` | Short Text | Override for `-bp` flag (e.g., "0.7") |
| `CLIFlags` | Short Text | Raw additional CLI flags (e.g., "-r 5 -t 0.3") |
Fields that already exist and will be reused: `Client`, `IMSURL`, `Work Category` (add "Link Building" option).
### Auto-creation approach
- Add `create_custom_field(list_id, name, type, type_config=None)` method to `cheddahbot/clickup.py` — calls `POST /list/{list_id}/field`
- Add a `setup_linkbuilding_fields` tool (category="linkbuilding") that:
1. Gets all list IDs in the space
2. For each list, checks if fields already exist (via `get_custom_fields`)
3. Creates missing fields via the new API method
4. For `LB Method` dropdown, creates with `type_config` containing "Cora Backlinks" option
5. For `Work Category`, adds "Link Building" option if missing
- This tool runs once during initial setup, or can be re-run if new lists are added
- Also add "Link Building" as an option to the existing `Work Category` dropdown if not present
## Data Flow & Status Lifecycle
### Primary Trigger: Folder Watcher (Cora Backlinks)
The folder watcher is the main trigger for Cora Backlinks. The ClickUp scheduler does NOT auto-execute these — it can't, because the .xlsx doesn't exist until the user runs Cora.
```
1. ClickUp task created:
Work Category="Link Building", LB Method="Cora Backlinks", status="to do"
Fields filled: Client, IMSURL, Keyword, CLIFlags, BrandedPlusRatio, etc.
→ Appears on dashboard as "needs Cora run"
2. User runs Cora manually, drops .xlsx in Z:/cora-inbox
3. Folder watcher (_scan_watch_folder, runs every 60 min):
→ Finds precision-cnc-machining.xlsx
→ Fuzzy matches "precision cnc machining" against Keyword field on ClickUp "to do" Link Building tasks
→ Match found → extracts metadata from ClickUp task (IMSURL, CLIFlags, etc.)
→ Sets CoraFile field on the ClickUp task to the file path
→ Moves task to "in progress"
→ Posts comment: "Starting Cora Backlinks pipeline..."
4. Pipeline runs:
→ Step 1: ingest-cora → comment: "CORA report ingested. Job file: jobs/xxx.json"
→ Step 2: generate-batch → comment: "Content generation complete. X articles across Y tiers."
5. On success:
→ Move task to "complete"
→ Post summary comment with stats
→ Move .xlsx to Z:/cora-inbox/processed/
6. On failure:
→ Move task to "internal review"
→ Post error comment with details
→ .xlsx stays in Z:/cora-inbox (can retry)
```
### Secondary Trigger: Chat
```
User: "Run link building for Z:/cora-inbox/precision-cnc-machining.xlsx"
→ Chat brain calls run_cora_backlinks (or run_link_building with explicit lb_method)
→ Tool auto-looks up matching ClickUp task via Keyword field (if exists)
→ Same pipeline + ClickUp sync as above
→ If no ClickUp match: runs pipeline without ClickUp tracking, returns results to chat only
```
### Future Trigger: ClickUp Scheduler (other LB Methods)
Future link building methods (MCP, etc.) that don't need a .xlsx CAN be auto-executed by the ClickUp scheduler. The `run_link_building` orchestrator checks `lb_method`:
- "Cora Backlinks" → requires xlsx_path, skips if empty (folder watcher handles these)
- Future methods → can execute directly from ClickUp task data
### ClickUp Skill Map Note
The skill_map entry for "Link Building" exists primarily for **field mapping reference** (so the folder watcher and chat know which ClickUp fields map to which tool params). The ClickUp scheduler will discover these tasks but `run_link_building` will skip Cora Backlinks that have no xlsx_path — they're waiting for the folder watcher.
## Implementation Order
1. **Config** — Add `LinkBuildingConfig` to config.py, add `link_building:` section to config.yaml, add `link_builder` agent to config.yaml
2. **Core tools** — Create `cheddahbot/tools/linkbuilding.py` with `_run_blm_command`, parsers, `run_link_building` orchestrator, and `run_cora_backlinks` pipeline
3. **Standalone tools** — Add `blm_ingest_cora` and `blm_generate_batch`
4. **Tests** — Create `tests/test_linkbuilding.py`, verify with `uv run pytest tests/test_linkbuilding.py -v`
5. **ClickUp field creation** — Add `create_custom_field` to clickup.py, add `setup_linkbuilding_fields` tool
6. **ClickUp integration** — Add skill_map entry, add ClickUp state tracking to tools
7. **Folder watcher** — Add `_folder_watch_loop` to scheduler.py, add `scan_cora_folder` tool
8. **API endpoint** — Add `/api/linkbuilding/status` to `__main__.py`
9. **Skill file** — Create `skills/linkbuilding.md`
10. **ClickUp setup** — Run `setup_linkbuilding_fields` to auto-create custom fields across all lists
11. **Full test run**`uv run pytest -v --no-cov`
## Verification
1. **Unit tests**: `uv run pytest tests/test_linkbuilding.py -v` — all pass with mocked subprocess
2. **Full suite**: `uv run pytest -v --no-cov` — no regressions
3. **Lint**: `uv run ruff check .` + `uv run ruff format .`
4. **Manual e2e**: Drop a real .xlsx in Z:/cora-inbox, verify ingest-cora runs, job JSON created, generate-batch runs
5. **ClickUp e2e**: Create a Link Building task in ClickUp with proper fields, wait for scheduler poll, verify execution
6. **Chat e2e**: Ask CheddahBot to "run link building for [keyword]" via chat UI
7. **API check**: Hit `http://localhost:7860/api/linkbuilding/status` and verify data returned
## Key Reference Files
- `cheddahbot/tools/press_release.py` — Reference pattern for multi-step pipeline tool
- `cheddahbot/scheduler.py:55-76` — Where to add 4th daemon thread
- `cheddahbot/config.py:108-200` — load_config() pattern for new config sections
- `E:/dev/Big-Link-Man/docs/CLI_COMMAND_REFERENCE.md` — Full CLI reference
- `E:/dev/Big-Link-Man/src/cli/commands.py` — Exact output formats to parse

View File

@ -0,0 +1,721 @@
# CheddahBot Architecture
## System Overview
CheddahBot is a personal AI assistant built in Python. It exposes a Gradio-based
web UI, routes user messages through an agent loop backed by a model-agnostic LLM
adapter, persists conversations in SQLite, maintains a 4-layer memory system with
optional semantic search, and provides an extensible tool registry that the LLM
can invoke mid-conversation. A background scheduler handles cron-based tasks and
periodic heartbeat checks.
### Data Flow Diagram
```
User (browser)
|
v
+-----------+ +------------+ +--------------+
| Gradio UI | ---> | Agent | ---> | LLM Adapter |
| (ui.py) | | (agent.py) | | (llm.py) |
+-----------+ +-----+------+ +------+-------+
| |
+------------+-------+ +-------+--------+
| | | | Claude CLI |
v v v | OpenRouter |
+---------+ +---------+ +---+ | Ollama |
| Router | | Tools | | DB| | LM Studio |
|(router) | |(tools/) | |(db| +----------------+
+----+----+ +----+----+ +---+
| |
+-------+--+ +----+----+
| Identity | | Memory |
| SOUL.md | | System |
| USER.md | |(memory) |
+----------+ +---------+
```
1. The user submits text (or voice / files) through the Gradio interface.
2. `ui.py` hands the message to `Agent.respond()`.
3. The agent stores the user message in SQLite, builds a system prompt via
`router.py` (loading identity files and memory context), and formats the
conversation history.
4. The agent sends messages to `LLMAdapter.chat()` which dispatches to the
correct provider backend.
5. The LLM response streams back. If it contains tool-call requests, the agent
executes them through `ToolRegistry.execute()`, appends the results, and loops
back to step 4 (up to 10 iterations).
6. The final assistant response is stored in the database and streamed to the UI.
7. After responding, the agent checks whether the conversation has exceeded the
flush threshold; if so, the memory system summarizes older messages into the
daily log.
---
## Module-by-Module Breakdown
### `__main__.py` -- Entry Point
**File:** `cheddahbot/__main__.py`
Orchestrates startup in this order:
1. `load_config()` -- loads configuration from env vars / YAML / defaults.
2. `Database(config.db_path)` -- opens (or creates) the SQLite database.
3. `LLMAdapter(...)` -- initializes the model-agnostic LLM client.
4. `Agent(config, db, llm)` -- creates the core agent.
5. `MemorySystem(config, db)` -- initializes the memory system and injects it
into the agent via `agent.set_memory()`.
6. `ToolRegistry(config, db, agent)` -- auto-discovers and loads all tool
modules, then injects via `agent.set_tools()`.
7. `Scheduler(config, db, agent)` -- starts two daemon threads (task poller and
heartbeat).
8. `create_ui(agent, config, llm)` -- builds the Gradio Blocks app and launches
it on the configured host/port.
Each subsystem (memory, tools, scheduler) is wrapped in a try/except so the
application degrades gracefully if optional dependencies are missing.
---
### `config.py` -- Configuration
**File:** `cheddahbot/config.py`
Defines four dataclasses:
| Dataclass | Key Fields |
|------------------|---------------------------------------------------------------|
| `Config` | `default_model`, `host`, `port`, `ollama_url`, `lmstudio_url`, `openrouter_api_key`, plus derived paths (`root_dir`, `data_dir`, `identity_dir`, `memory_dir`, `skills_dir`, `db_path`) |
| `MemoryConfig` | `max_context_messages` (50), `flush_threshold` (40), `embedding_model` ("all-MiniLM-L6-v2"), `search_top_k` (5) |
| `SchedulerConfig` | `heartbeat_interval_minutes` (30), `poll_interval_seconds` (60) |
| `ShellConfig` | `blocked_commands`, `require_approval` (False) |
`load_config()` applies three layers of configuration in priority order:
1. Dataclass defaults (lowest priority).
2. `config.yaml` at the project root (middle priority).
3. Environment variables with the `CHEDDAH_` prefix, plus `OPENROUTER_API_KEY`
(highest priority).
The function also ensures required data directories exist on disk.
---
### `db.py` -- Database Layer
**File:** `cheddahbot/db.py`
A thin wrapper around SQLite using thread-local connections (one connection per
thread), WAL journal mode, and foreign keys.
**Key methods:**
- `create_conversation(conv_id, title)` -- insert a new conversation row.
- `list_conversations(limit)` -- return recent conversations ordered by
`updated_at`.
- `add_message(conv_id, role, content, ...)` -- insert a message and touch the
conversation's `updated_at`.
- `get_messages(conv_id, limit)` -- return messages in chronological order.
- `count_messages(conv_id)` -- count messages for flush-threshold checks.
- `add_scheduled_task(name, prompt, schedule)` -- persist a scheduled task.
- `get_due_tasks()` -- return tasks whose `next_run` is in the past or NULL.
- `update_task_next_run(task_id, next_run)` -- update the next execution time.
- `log_task_run(task_id, result, error)` -- record the outcome of a task run.
- `kv_set(key, value)` / `kv_get(key)` -- generic key-value store.
---
### `agent.py` -- Core Agent Loop
**File:** `cheddahbot/agent.py`
Contains the `Agent` class, the central coordinator.
**Key members:**
- `conv_id` -- current conversation ID (a 12-character hex string).
- `_memory` -- optional `MemorySystem` reference.
- `_tools` -- optional `ToolRegistry` reference.
**Primary method: `respond(user_input, files)`**
This is a Python generator that yields text chunks for streaming. The detailed
flow is described in the next section.
**Helper: `respond_to_prompt(prompt)`**
Non-streaming wrapper that collects all chunks and returns a single string. Used
by the scheduler and heartbeat for internal prompts.
---
### `router.py` -- System Prompt Builder
**File:** `cheddahbot/router.py`
Two functions:
1. `build_system_prompt(identity_dir, memory_context, tools_description)` --
assembles the full system prompt by concatenating these sections separated by
horizontal rules:
- Contents of `identity/SOUL.md`
- Contents of `identity/USER.md`
- Memory context string (from the memory system)
- Tools description listing (from the tool registry)
- A fixed "Instructions" section with core behavioral directives.
2. `format_messages_for_llm(system_prompt, history, max_messages)` --
converts raw database rows into the `[{role, content}]` format expected by
the LLM. The system prompt becomes the first message. Tool results are
converted to user messages prefixed with `[Tool Result]`. History is trimmed
to the most recent `max_messages` entries.
---
### `llm.py` -- LLM Adapter
**File:** `cheddahbot/llm.py`
Described in detail in a dedicated section below.
---
### `memory.py` -- Memory System
**File:** `cheddahbot/memory.py`
Described in detail in a dedicated section below.
---
### `media.py` -- Audio/Video Processing
**File:** `cheddahbot/media.py`
Three utility functions:
- `transcribe_audio(path)` -- Speech-to-text. Tries local Whisper first, then
falls back to the OpenAI Whisper API.
- `text_to_speech(text, output_path, voice)` -- Text-to-speech via `edge-tts`
(free, no API key). Defaults to the `en-US-AriaNeural` voice.
- `extract_video_frames(video_path, max_frames)` -- Extracts key frames from
video using `ffprobe` (to get duration) and `ffmpeg` (to extract JPEG frames).
---
### `scheduler.py` -- Scheduler and Heartbeat
**File:** `cheddahbot/scheduler.py`
Described in detail in a dedicated section below.
---
### `ui.py` -- Gradio Web Interface
**File:** `cheddahbot/ui.py`
Builds a Gradio Blocks application with:
- A model dropdown (populated from `llm.list_available_models()`) with a refresh
button and a "New Chat" button.
- A `gr.Chatbot` widget for the conversation (500px height, copy buttons).
- A `gr.MultimodalTextbox` supporting text, file upload, and microphone input.
- A "Voice Chat" accordion for record-and-respond audio interaction.
- A "Conversation History" accordion showing past conversations from the
database.
- A "Settings" accordion with guidance on editing identity and config files.
**Event wiring:**
- Model dropdown change calls `llm.switch_model()`.
- Refresh button re-discovers local models.
- Message submit calls `agent.respond()` in streaming mode, updating the chatbot
widget with each chunk.
- Audio files attached to messages are transcribed via `media.transcribe_audio()`
before being sent to the agent.
- Voice Chat records audio, transcribes it, gets a text response from the agent,
converts it to speech via `media.text_to_speech()`, and plays it back.
---
### `tools/__init__.py` -- Tool Registry
**File:** `cheddahbot/tools/__init__.py`
Described in detail in a dedicated section below.
---
### `skills/__init__.py` -- Skill Registry
**File:** `cheddahbot/skills/__init__.py`
Defines a parallel registry for "skills" (multi-step operations). Key pieces:
- `SkillDef` -- dataclass holding `name`, `description`, `func`.
- `@skill(name, description)` -- decorator that registers a skill in the global
`_SKILLS` dict.
- `load_skill(path)` -- dynamically loads a `.py` file as a module (triggering
any `@skill` decorators inside it).
- `discover_skills(skills_dir)` -- loads all `.py` files from the skills
directory.
- `list_skills()` / `run_skill(name, **kwargs)` -- query and execute skills.
---
### `providers/__init__.py` -- Provider Extensions
**File:** `cheddahbot/providers/__init__.py`
Reserved for future custom provider implementations. Currently empty.
---
## The Agent Loop in Detail
When `Agent.respond(user_input)` is called, the following sequence occurs:
```
1. ensure_conversation()
|-- Creates a new conversation in the DB if one doesn't exist
|
2. db.add_message(conv_id, "user", user_input)
|-- Persists the user's message
|
3. Build system prompt
|-- memory.get_context(user_input) --> memory context string
|-- tools.get_tools_schema() --> OpenAI-format JSON schemas
|-- tools.get_tools_description() --> human-readable tool list
|-- router.build_system_prompt(identity_dir, memory_context, tools_description)
|
4. Load conversation history from DB
|-- db.get_messages(conv_id, limit=max_context_messages)
|-- router.format_messages_for_llm(system_prompt, history, max_messages)
|
5. AGENT LOOP (up to MAX_TOOL_ITERATIONS = 10):
|
|-- llm.chat(messages, tools=tools_schema, stream=True)
| |-- Yields {"type":"text","content":"..."} chunks --> streamed to user
| |-- Yields {"type":"tool_use","name":"...","input":{...}} chunks
|
|-- If no tool_calls: store assistant message, BREAK
|
|-- If tool_calls present:
| |-- Store assistant message with tool_calls metadata
| |-- For each tool call:
| | |-- yield "Using tool: <name>" indicator
| | |-- tools.execute(name, input) --> result string
| | |-- yield tool result (truncated to 2000 chars)
| | |-- db.add_message(conv_id, "tool", result)
| | |-- Append result to messages as user message
| |-- Continue loop (LLM sees tool results and can respond or call more tools)
|
6. After loop: check if memory flush is needed
|-- If message count > flush_threshold:
| |-- memory.auto_flush(conv_id)
```
The loop allows the LLM to chain up to 10 consecutive tool calls before being
cut off. Each tool result is injected back into the conversation as a user
message so the LLM can reason about it in the next iteration.
---
## LLM Adapter Design
**File:** `cheddahbot/llm.py`
### Provider Routing
The `LLMAdapter` supports four provider paths. The active provider is determined
by examining the current model ID:
| Model ID Pattern | Provider | Backend |
|-----------------------------|---------------|----------------------------------|
| `claude-*` | `claude` | Claude Code CLI (subprocess) |
| `local/ollama/<model>` | `ollama` | Ollama HTTP API (OpenAI-compat) |
| `local/lmstudio/<model>` | `lmstudio` | LM Studio HTTP API (OpenAI-compat) |
| Anything else | `openrouter` | OpenRouter API (OpenAI-compat) |
### The `chat()` Method
This is the single entry point. It accepts a list of messages, an optional tools
schema, and a stream flag. It returns a generator yielding dictionaries:
- `{"type": "text", "content": "..."}` -- a text chunk to display.
- `{"type": "tool_use", "id": "...", "name": "...", "input": {...}}` -- a tool
invocation request.
### Claude Code CLI Path (`_chat_claude_sdk`)
For Claude models, CheddahBot shells out to the `claude` CLI binary (the Claude
Code SDK):
1. Separates system prompt, conversation history, and the latest user message
from the messages list.
2. Builds a full system prompt by appending conversation history under a
"Conversation So Far" heading.
3. Invokes `claude -p <prompt> --model <model> --output-format json --system-prompt <system>`.
4. The `CLAUDECODE` environment variable is stripped from the subprocess
environment to avoid nested-session errors.
5. Parses the JSON output and yields the `result` field as a text chunk.
6. On Windows, `shell=True` is used for compatibility with npm-installed
binaries.
### OpenAI-Compatible Path (`_chat_openai_sdk`)
For OpenRouter, Ollama, and LM Studio, the adapter uses the `openai` Python SDK:
1. `_resolve_endpoint(provider)` returns the base URL and API key:
- OpenRouter: `https://openrouter.ai/api/v1` with the configured API key.
- Ollama: `http://localhost:11434/v1` with dummy key `"ollama"`.
- LM Studio: `http://localhost:1234/v1` with dummy key `"lm-studio"`.
2. `_resolve_model_id(provider)` strips the `local/ollama/` or
`local/lmstudio/` prefix from the model ID.
3. Creates an `openai.OpenAI` client with the resolved base URL and API key.
4. In streaming mode: iterates over `client.chat.completions.create(stream=True)`,
accumulates tool call arguments across chunks (indexed by `tc.index`), yields
text deltas immediately, and yields completed tool calls at the end of the
stream.
5. In non-streaming mode: makes a single call and yields text and tool calls from
the response.
### Model Discovery
- `discover_local_models()` -- probes the Ollama tags endpoint and LM Studio
models endpoint (3-second timeout each) and returns `ModelInfo` objects.
- `list_available_models()` -- returns a combined list of hardcoded Claude
models, hardcoded OpenRouter models (if an API key is configured), and
dynamically discovered local models.
### Model Switching
`switch_model(model_id)` updates `current_model`. The `provider` property
re-evaluates on every access, so switching models also implicitly switches
providers.
---
## Memory System
**File:** `cheddahbot/memory.py`
### The 4 Layers
```
Layer 1: Identity -- identity/SOUL.md, identity/USER.md
(loaded by router.py into the system prompt)
Layer 2: Long-term -- memory/MEMORY.md
(persisted facts and instructions, appended over time)
Layer 3: Daily logs -- memory/YYYY-MM-DD.md
(timestamped entries per day, including auto-flush summaries)
Layer 4: Semantic -- memory/embeddings.db
(SQLite with vector embeddings for similarity search)
```
### How Memory Context is Built
`MemorySystem.get_context(query)` is called once per agent turn. It assembles a
string from:
1. **Long-term memory** -- the last 2000 characters of `MEMORY.md`.
2. **Today's log** -- the last 1500 characters of today's date file.
3. **Semantic search results** -- the top-k most similar entries to the user's
query, formatted as a bulleted list.
This string is injected into the system prompt by `router.py` under the heading
"Relevant Memory".
### Embedding and Search
- The embedding model is `all-MiniLM-L6-v2` from `sentence-transformers` (lazy
loaded, thread-safe via a lock).
- `_index_text(text, doc_id)` -- encodes the text into a vector and stores it in
`memory/embeddings.db` (table: `embeddings` with columns `id TEXT`, `text TEXT`,
`vector BLOB`).
- `search(query, top_k)` -- encodes the query, loads all vectors from the
database, computes cosine similarity against each one, sorts by score, and
returns the top-k results.
- If `sentence-transformers` is not installed, `_fallback_search()` performs
simple case-insensitive substring matching across all `.md` files in the memory
directory.
### Writing to Memory
- `remember(text)` -- appends a timestamped entry to `memory/MEMORY.md` and
indexes it for semantic search. Exposed to the LLM via the `remember_this`
tool.
- `log_daily(text)` -- appends a timestamped entry to today's daily log file and
indexes it. Exposed via the `log_note` tool.
### Auto-Flush
When `Agent.respond()` finishes, it checks `db.count_messages(conv_id)`. If the
count exceeds `config.memory.flush_threshold` (default 40):
1. `auto_flush(conv_id)` loads up to 200 messages.
2. All but the last 10 are selected for summarization.
3. A summary string is built from the selected messages (truncated to 1000
chars).
4. The summary is appended to the daily log via `log_daily()`.
This prevents conversations from growing unbounded while preserving context in
the daily log for future semantic search.
### Reindexing
`reindex_all()` clears all embeddings and re-indexes every line (longer than 10
characters) from every `.md` file in the memory directory. This can be called
to rebuild the search index from scratch.
---
## Tool System
**File:** `cheddahbot/tools/__init__.py` (registry) and `cheddahbot/tools/*.py`
(tool modules)
### The `@tool` Decorator
```python
from cheddahbot.tools import tool
@tool("my_tool_name", "Description of what this tool does", category="general")
def my_tool_name(param1: str, param2: int = 10) -> str:
return f"Result: {param1}, {param2}"
```
The decorator:
1. Creates a `ToolDef` object containing the function, name, description,
category, and auto-extracted parameter schema.
2. Registers it in the global `_TOOLS` dictionary keyed by name.
3. Attaches the `ToolDef` as `func._tool_def` on the original function.
### Parameter Schema Generation
`_extract_params(func)` inspects the function signature using `inspect`:
- Skips parameters named `self` or `ctx`.
- Maps type annotations to JSON Schema types: `str` -> `"string"`, `int` ->
`"integer"`, `float` -> `"number"`, `bool` -> `"boolean"`, `list` ->
`"array"`. Unannotated parameters default to `"string"`.
- Parameters without defaults are marked as required.
### Schema Output
`ToolDef.to_openai_schema()` returns the tool definition in OpenAI
function-calling format:
```json
{
"type": "function",
"function": {
"name": "tool_name",
"description": "...",
"parameters": {
"type": "object",
"properties": { ... },
"required": [ ... ]
}
}
}
```
### Auto-Discovery
When `ToolRegistry.__init__()` is called, `_discover_tools()` uses
`pkgutil.iter_modules` to find every `.py` file in `cheddahbot/tools/` (skipping
files starting with `_`). Each module is imported via `importlib.import_module`,
which triggers the `@tool` decorators and populates the global registry.
### Tool Execution
`ToolRegistry.execute(name, args)`:
1. Looks up the `ToolDef` in the global `_TOOLS` dict.
2. Inspects the function signature for a `ctx` parameter. If present, injects a
context dictionary containing `config`, `db`, `agent`, and `memory`.
3. Calls the function with the provided arguments.
4. Returns the result as a string (or `"Done."` if the function returns `None`).
5. Catches all exceptions and returns `"Tool error: ..."`.
### Meta-Tools
Two special tools enable runtime extensibility:
**`build_tool`** (in `cheddahbot/tools/build_tool.py`):
- Accepts `name`, `description`, and `code` (Python source using the `@tool`
decorator).
- Writes a new `.py` file into `cheddahbot/tools/`.
- Hot-imports the module via `importlib.import_module`, which triggers the
`@tool` decorator and registers the new tool immediately.
- If the import fails, the file is deleted.
**`build_skill`** (in `cheddahbot/tools/build_skill.py`):
- Accepts `name`, `description`, and `steps` (Python source using the `@skill`
decorator).
- Writes a new `.py` file into the configured `skills/` directory.
- Calls `skills.load_skill()` to dynamically import it.
---
## Scheduler and Heartbeat Design
**File:** `cheddahbot/scheduler.py`
The `Scheduler` class starts two daemon threads at application boot.
### Task Poller Thread
- Runs in `_poll_loop()`, sleeping for `poll_interval_seconds` (default 60)
between iterations.
- Each iteration calls `_run_due_tasks()`:
1. Queries `db.get_due_tasks()` for tasks where `next_run` is NULL or in the
past.
2. For each due task, calls `agent.respond_to_prompt(task["prompt"])` to
generate a response.
3. Logs the result via `db.log_task_run()`.
4. If the schedule is `"once:<datetime>"`, the task is disabled.
5. Otherwise, the schedule is treated as a cron expression: `croniter` is used
to calculate the next run time, which is saved via
`db.update_task_next_run()`.
### Heartbeat Thread
- Runs in `_heartbeat_loop()`, sleeping for `heartbeat_interval_minutes`
(default 30) between iterations.
- Waits 60 seconds before the first heartbeat to let the system initialize.
- Each iteration calls `_run_heartbeat()`:
1. Reads `identity/HEARTBEAT.md`.
2. Sends the checklist to the agent as a prompt: "HEARTBEAT CHECK. Review this
checklist and take action if needed."
3. If the response contains `"HEARTBEAT_OK"`, no action is logged.
4. Otherwise, the response is logged to the daily log via
`memory.log_daily()`.
### Thread Safety
Both threads are daemon threads (they die when the main process exits). The
`_stop_event` threading event can be set to gracefully shut down both loops. The
database layer uses thread-local connections, so concurrent access from the
scheduler threads and the Gradio request threads is safe.
---
## Database Schema
The SQLite database (`data/cheddahbot.db`) contains five tables:
### `conversations`
| Column | Type | Notes |
|--------------|------|--------------------|
| `id` | TEXT | Primary key (hex) |
| `title` | TEXT | Display title |
| `created_at` | TEXT | ISO 8601 UTC |
| `updated_at` | TEXT | ISO 8601 UTC |
### `messages`
| Column | Type | Notes |
|---------------|---------|--------------------------------------------|
| `id` | INTEGER | Autoincrement primary key |
| `conv_id` | TEXT | Foreign key to `conversations.id` |
| `role` | TEXT | `"user"`, `"assistant"`, or `"tool"` |
| `content` | TEXT | Message body |
| `tool_calls` | TEXT | JSON array of `{name, input}` (nullable) |
| `tool_result` | TEXT | Name of the tool that produced this result (nullable) |
| `model` | TEXT | Model ID used for this response (nullable) |
| `created_at` | TEXT | ISO 8601 UTC |
Index: `idx_messages_conv` on `(conv_id, created_at)`.
### `scheduled_tasks`
| Column | Type | Notes |
|--------------|---------|---------------------------------------|
| `id` | INTEGER | Autoincrement primary key |
| `name` | TEXT | Human-readable task name |
| `prompt` | TEXT | The prompt to send to the agent |
| `schedule` | TEXT | Cron expression or `"once:<datetime>"`|
| `enabled` | INTEGER | 1 = active, 0 = disabled |
| `next_run` | TEXT | ISO 8601 UTC (nullable) |
| `created_at` | TEXT | ISO 8601 UTC |
### `task_run_logs`
| Column | Type | Notes |
|---------------|---------|------------------------------------|
| `id` | INTEGER | Autoincrement primary key |
| `task_id` | INTEGER | Foreign key to `scheduled_tasks.id`|
| `started_at` | TEXT | ISO 8601 UTC |
| `finished_at` | TEXT | ISO 8601 UTC (nullable) |
| `result` | TEXT | Agent response (nullable) |
| `error` | TEXT | Error message if failed (nullable) |
### `kv_store`
| Column | Type | Notes |
|---------|------|-----------------|
| `key` | TEXT | Primary key |
| `value` | TEXT | Arbitrary value |
### Embeddings Database
A separate SQLite file at `memory/embeddings.db` holds one table:
### `embeddings`
| Column | Type | Notes |
|----------|------|--------------------------------------|
| `id` | TEXT | Primary key (e.g. `"daily:2026-02-14:08:30"`) |
| `text` | TEXT | The original text that was embedded |
| `vector` | BLOB | Raw float32 bytes of the embedding vector |
---
## Identity Files
Three Markdown files in the `identity/` directory define the agent's personality,
user context, and background behavior.
### `identity/SOUL.md`
Defines the agent's personality, communication style, boundaries, and quirks.
This is loaded first into the system prompt, making it the most prominent
identity influence on every response.
Contents are read by `router.build_system_prompt()` at the beginning of each
agent turn.
### `identity/USER.md`
Contains a user profile template: name, technical level, primary language,
current projects, and communication preferences. The user edits this file to
customize how the agent addresses them and what context it assumes.
Loaded by `router.build_system_prompt()` immediately after SOUL.md.
### `identity/HEARTBEAT.md`
A checklist of items to review on each heartbeat cycle. The scheduler reads this
file and sends it to the agent as a prompt every `heartbeat_interval_minutes`
(default 30 minutes). The agent processes the checklist and either confirms
"HEARTBEAT_OK" or takes action and logs it.
### Loading Order in the System Prompt
The system prompt assembled by `router.build_system_prompt()` concatenates these
sections, separated by `\n\n---\n\n`:
1. SOUL.md contents
2. USER.md contents
3. Memory context (long-term + daily log + semantic search results)
4. Tools description (categorized list of available tools)
5. Core instructions (hardcoded behavioral directives)

View File

@ -0,0 +1,61 @@
# ClickUp Task Creation
## CLI Script
```bash
uv run python scripts/create_clickup_task.py --name "LINKS - keyword" --client "Client Name" \
--category "Link Building" --due-date 2026-03-18 --tag mar26 --time-estimate 2h \
--field "Keyword=keyword" --field "IMSURL=https://example.com" --field "LB Method=Cora Backlinks"
```
## Defaults
- Priority: High (2)
- Assignee: Bryan (10765627)
- Status: "to do"
- Due date format: YYYY-MM-DD
- Tag format: mmmYY (e.g. feb26, mar26)
## Custom Fields
Any field can be set via `--field "Name=Value"`. Dropdowns are auto-resolved by name (case-insensitive).
## Task Types
### Link Building
- **Prefix**: `LINKS - {keyword}`
- **Work Category**: "Link Building"
- **Required fields**: Keyword, IMSURL
- **LB Method**: default "Cora Backlinks"
- **CLIFlags**: only add `--tier1-count N` when count is specified
- **BrandedPlusRatio**: default to 0.7
- **CustomAnchors**: only if given a list of custom anchors
- **time estimate**: 2.5h
### On Page Optimization
- **Prefix**: `OPT - {keyword}`
- **Work Category**: "On Page Optimization"
- **Required fields**: Keyword, IMSURL
- **time estimate**: 3h
-
### Content Creation
- **Prefix**: `CREATE - {keyword}`
- **Work Category**: "Content Creation"
- **Required fields**: Keyword
- **time estimate**: 4h
### Press Release
- **Prefix**: `PR - {keyword}`
- **Required fields**: Keyword, IMSURL
- **Work Category**: "Press Release"
- **PR Topic**: if not provided, ask if there is a topic. it can be blank if they respond with none.
- **time estimate**: 1.5h
## Chat Tool
The `clickup_create_task` tool provides the same capabilities via CheddahBot UI. Arbitrary custom fields are passed as JSON via `custom_fields_json`.
## Client Folder Lookup
Tasks are created in the "Overall" list inside the client's folder. Folder name is matched case-insensitively.

110
docs/ntfy-setup.md 100644
View File

@ -0,0 +1,110 @@
# ntfy.sh Push Notifications Setup
CheddahBot sends push notifications to your phone and desktop via [ntfy.sh](https://ntfy.sh) when tasks complete, reports are ready, or errors occur.
## 1. Install the ntfy App
- **Android:** [Play Store](https://play.google.com/store/apps/details?id=io.heckel.ntfy)
- **iOS:** [App Store](https://apps.apple.com/us/app/ntfy/id1625396347)
- **Desktop:** Open [ntfy.sh](https://ntfy.sh) in your browser and enable browser notifications when prompted
## 2. Pick Topic Names
Topics are like channels. Anyone who knows the topic name can subscribe, so use random strings:
```
cheddahbot-a8f3k9x2m7
cheddahbot-errors-p4w2j6n8
```
Generate your own — any random string works. No account or registration needed.
## 3. Subscribe to Your Topics
**Phone app:**
1. Open the ntfy app
2. Tap the + button
3. Enter your topic name (e.g. `cheddahbot-a8f3k9x2m7`)
4. Server: `https://ntfy.sh` (default)
5. Repeat for your errors topic
**Browser:**
1. Go to [ntfy.sh](https://ntfy.sh)
2. Click "Subscribe to topic"
3. Enter the same topic names
4. Allow browser notifications when prompted
## 4. Add Topics to .env
Add these lines to your `.env` file in the CheddahBot root:
```
NTFY_TOPIC_HUMAN_ACTION=cheddahbot-a8f3k9x2m7
NTFY_TOPIC_ERRORS=cheddahbot-errors-p4w2j6n8
```
Replace with your actual topic names.
## 5. Restart CheddahBot
Kill the running instance and restart:
```bash
uv run python -m cheddahbot
```
You should see in the startup logs:
```
ntfy notifier initialized with 2 channel(s): human_action, errors
ntfy notifier subscribed to notification bus
```
## What Gets Notified
### human_action channel (high priority)
Notifications where you need to do something:
- Cora report finished and ready
- Press release completed
- Content outline ready for review
- Content optimization completed
- Link building pipeline finished
- Cora report distributed to inbox
### errors channel (urgent priority)
Notifications when something went wrong:
- ClickUp task failed or was skipped
- AutoCora job failed
- Link building pipeline error
- Content pipeline error
- Missing ClickUp field matches
- File copy failures
## Configuration
Channel routing is configured in `config.yaml` under the `ntfy:` section. Each channel has:
- `topic_env_var` — which env var holds the topic name
- `categories` — notification categories to listen to (`clickup`, `autocora`, `linkbuilding`, `content`)
- `include_patterns` — regex patterns the message must match (at least one)
- `exclude_patterns` — regex patterns that reject the message (takes priority over include)
- `priority` — ntfy priority level: `min`, `low`, `default`, `high`, `urgent`
- `tags` — emoji shortcodes shown on the notification (e.g. `white_check_mark`, `rotating_light`)
### Adding a New Channel
1. Add a new entry under `ntfy.channels` in `config.yaml`
2. Add the topic env var to `.env`
3. Subscribe to the topic in your ntfy app
4. Restart CheddahBot
### Privacy
The public ntfy.sh server has no authentication by default. Your topic name is the only security — use a long random string to make it unguessable. Alternatively:
- Create a free ntfy.sh account and set read/write ACLs on your topics
- Self-host ntfy (single binary) and set `server: http://localhost:8080` in config.yaml
### Disabling
Set `enabled: false` in the `ntfy:` section of `config.yaml`, or remove the env vars from `.env`.

View File

@ -0,0 +1,43 @@
# Scheduler Refactor Notes
## Issue: AutoCora Single-Day Window (found 2026-02-27)
**Symptom:** Task `86b8grf16` ("LINKS - anti vibration rubber mounts", due Feb 18) has been sitting in "to do" forever with no Cora report generated.
**Root cause:** `_find_qualifying_tasks()` in `tools/autocora.py` filters tasks to **exactly one calendar day** (the `target_date`, which defaults to today). The scheduler calls this daily with `today`:
```python
today = datetime.now(UTC).strftime("%Y-%m-%d")
result = submit_autocora_jobs(target_date=today, ctx=ctx)
```
If CheddahBot isn't running on the task's due date (or the DB is empty/wiped), the task is **permanently orphaned** — no catch-up, no retry, no visibility.
**Affected task types:** All three `cora_categories` — Link Building, On Page Optimization, Content Creation.
**What needs to change:** Auto-submit should also pick up overdue tasks (due date in the past, still "to do", no existing AutoCora job in KV store).
---
## Empty Database State (found 2026-02-27)
`cheddahbot.db` has zero rows in all tables (kv_store, notifications, scheduled_tasks, etc.). Either fresh DB or wiped. This means:
- No task state tracking is happening
- No AutoCora job submissions are recorded
- Folder watcher has no history
- All loops show no `last_run` timestamps
---
## Context: Claude Scheduled Tasks
Claude released scheduled tasks (2026-02-26). Need to evaluate whether parts of CheddahBot's scheduler (heartbeat, poll loop, ClickUp polling, folder watchers, AutoCora) could be replaced or augmented by Claude's native scheduling.
---
## Additional Issues to Investigate
- [ ] `auto_execute: false` on Link Building — is this intentional given the folder-watcher pipeline?
- [ ] Folder watcher at `Z:/cora-inbox` — does this path stay accessible?
- [ ] No dashboard/UI surfacing "tasks waiting for action" — stuck tasks are invisible
- [ ] AutoCora loop waits 30s before first poll, then runs every 5min — but auto-submit only checks today's tasks each cycle (redundant repeated calls)

View File

@ -0,0 +1,94 @@
"""Query ClickUp 'to do' tasks tagged feb26 in OPT/LINKS/Content categories."""
import sys
from datetime import datetime, timezone
from pathlib import Path
sys.stdout.reconfigure(line_buffering=True)
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from cheddahbot.config import load_config
from cheddahbot.clickup import ClickUpClient
CATEGORY_PREFIXES = ("opt", "link", "content", "ai content")
TAG_FILTER = "feb26"
def ms_to_date(ms_str: str) -> str:
if not ms_str:
return ""
try:
ts = int(ms_str) / 1000
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%m/%d")
except (ValueError, OSError):
return ""
def main():
cfg = load_config()
if not cfg.clickup.api_token or not cfg.clickup.space_id:
print("ERROR: CLICKUP_API_TOKEN or CLICKUP_SPACE_ID not set.")
return
client = ClickUpClient(
api_token=cfg.clickup.api_token,
workspace_id=cfg.clickup.workspace_id,
task_type_field_name=cfg.clickup.task_type_field_name,
)
try:
# Fetch all 'to do' tasks across the space
tasks = client.get_tasks_from_space(cfg.clickup.space_id, statuses=["to do"])
# Filter by feb26 tag
tagged = [t for t in tasks if TAG_FILTER in [tag.lower() for tag in t.tags]]
if not tagged:
all_tags = set()
for t in tasks:
all_tags.update(t.tags)
print(f"No tasks with tag '{TAG_FILTER}'. Tags seen: {sorted(all_tags)}")
print(f"Total 'to do' tasks found: {len(tasks)}")
return
# Filter to OPT/LINKS/Content categories (by task name, Work Category, or list name)
def is_target_category(t):
name_lower = t.name.lower().strip()
wc = (t.custom_fields.get("Work Category") or "").lower()
ln = (t.list_name or "").lower()
for prefix in CATEGORY_PREFIXES:
if name_lower.startswith(prefix) or prefix in wc or prefix in ln:
return True
return False
filtered = [t for t in tagged if is_target_category(t)]
skipped = [t for t in tagged if not is_target_category(t)]
# Sort by due date (oldest first), tasks with no due date go last
filtered.sort(key=lambda t: int(t.due_date) if t.due_date else float("inf"))
top = filtered[:10]
# Build table
print(f"feb26-tagged 'to do' tasks — OPT / LINKS / Content (top 10, oldest first)")
print(f"\n{'#':>2} | {'ID':<11} | {'Keyword/Name':<50} | {'Due':<6} | {'Customer':<25} | Tags")
print("-" * 120)
for i, t in enumerate(top, 1):
customer = t.custom_fields.get("Customer", "") or ""
due = ms_to_date(t.due_date)
tags = ", ".join(t.tags)
name = t.name[:50]
print(f"{i:>2} | {t.id:<11} | {name:<50} | {due:<6} | {customer:<25} | {tags}")
print(f"\nShowing {len(top)} of {len(filtered)} OPT/LINKS/Content tasks ({len(tagged)} total feb26-tagged).")
if skipped:
print(f"\nSkipped {len(skipped)} non-OPT/LINKS/Content tasks:")
for t in skipped:
print(f" - {t.name} ({t.id})")
finally:
client.close()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,120 @@
"""Query ClickUp 'to do' tasks tagged feb26 in OPT/LINKS/Content categories."""
import sys
from pathlib import Path
from datetime import datetime, timezone
# Add project root to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from cheddahbot.config import load_config
from cheddahbot.clickup import ClickUpClient
def ms_to_date(ms_str: str) -> str:
"""Convert Unix-ms timestamp string to YYYY-MM-DD."""
if not ms_str:
return ""
try:
ts = int(ms_str) / 1000
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d")
except (ValueError, OSError):
return ""
def main():
cfg = load_config()
if not cfg.clickup.api_token or not cfg.clickup.space_id:
print("ERROR: CLICKUP_API_TOKEN or CLICKUP_SPACE_ID not set.")
return
client = ClickUpClient(
api_token=cfg.clickup.api_token,
workspace_id=cfg.clickup.workspace_id,
task_type_field_name=cfg.clickup.task_type_field_name,
)
# Step 1: Get folders, find OPT/LINKS/Content
target_folders = {"opt", "links", "content"}
try:
folders = client.get_folders(cfg.clickup.space_id)
except Exception as e:
print(f"ERROR fetching folders: {e}")
client.close()
return
print(f"All folders: {[f['name'] for f in folders]}")
matched_lists = [] # (list_id, list_name, folder_name)
for folder in folders:
if folder["name"].lower() in target_folders:
for lst in folder["lists"]:
matched_lists.append((lst["id"], lst["name"], folder["name"]))
if not matched_lists:
print(f"No folders matching {target_folders}. Falling back to full space scan.")
try:
tasks = client.get_tasks_from_space(cfg.clickup.space_id, statuses=["to do"])
finally:
client.close()
else:
print(f"Querying lists: {[(ln, fn) for _, ln, fn in matched_lists]}")
tasks = []
for list_id, list_name, folder_name in matched_lists:
try:
batch = client.get_tasks(list_id, statuses=["to do"])
# Stash folder name on each task for display
for t in batch:
t._folder = folder_name
tasks.extend(batch)
except Exception as e:
print(f" Error fetching {list_name}: {e}")
client.close()
print(f"Total 'to do' tasks from target folders: {len(tasks)}")
# Filter by "feb26" tag (case-insensitive)
tagged = [t for t in tasks if any(tag.lower() == "feb26" for tag in t.tags)]
if not tagged:
print(f"No 'to do' tasks with 'feb26' tag found.")
all_tags = set()
for t in tasks:
all_tags.update(t.tags)
print(f"Tags found across all to-do tasks: {sorted(all_tags)}")
return
filtered = tagged
# Sort by due date (oldest first), tasks without due date go last
def sort_key(t):
if t.due_date:
return (0, int(t.due_date))
return (1, 0)
filtered.sort(key=sort_key)
# Take top 10
top10 = filtered[:10]
# Build table
print(f"\n## ClickUp 'to do' — feb26 tag — OPT/LINKS/Content ({len(filtered)} total, showing top 10)\n")
print(f"{'#':<3} | {'ID':<12} | {'Keyword/Name':<40} | {'Due':<12} | {'Customer':<20} | Tags")
print(f"{''*3} | {''*12} | {''*40} | {''*12} | {''*20} | {''*15}")
for i, t in enumerate(top10, 1):
customer = t.custom_fields.get("Customer", "") or ""
due = ms_to_date(t.due_date)
tags = ", ".join(t.tags) if t.tags else ""
name = t.name[:38] + ".." if len(t.name) > 40 else t.name
print(f"{i:<3} | {t.id:<12} | {name:<40} | {due:<12} | {customer:<20} | {tags}")
print(f"\nCategory breakdown:")
from collections import Counter
cats = Counter(t.task_type for t in filtered)
for cat, count in cats.most_common():
print(f" {cat or '(none)'}: {count}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,97 @@
"""Query ClickUp for feb26-tagged to-do tasks in OPT/LINKS/Content categories."""
from datetime import datetime, UTC
from cheddahbot.config import load_config
from cheddahbot.clickup import ClickUpClient
cfg = load_config()
client = ClickUpClient(
api_token=cfg.clickup.api_token,
workspace_id=cfg.clickup.workspace_id,
task_type_field_name=cfg.clickup.task_type_field_name,
)
tasks = client.get_tasks_from_overall_lists(cfg.clickup.space_id, statuses=["to do"])
client.close()
# Filter: tagged feb26
feb26 = [t for t in tasks if "feb26" in t.tags]
# Filter: OPT / LINKS / Content categories (by Work Category or name prefix)
def is_target(t):
cat = (t.task_type or "").lower()
name = t.name.upper()
if cat in ("on page optimization", "link building", "content creation"):
return True
if name.startswith("OPT") or name.startswith("LINKS") or name.startswith("NEW -"):
return True
return False
filtered = [t for t in feb26 if is_target(t)]
# Sort by due date ascending (no due date = sort last)
def sort_key(t):
if t.due_date:
return int(t.due_date)
return float("inf")
filtered.sort(key=sort_key)
top10 = filtered[:10]
def fmt_due(ms_str):
if not ms_str:
return "No due"
ts = int(ms_str) / 1000
return datetime.fromtimestamp(ts, tz=UTC).strftime("%b %d")
def fmt_customer(t):
c = t.custom_fields.get("Customer", "")
if c and str(c) != "None":
return str(c)
return t.list_name
def fmt_cat(t):
cat = t.task_type
name = t.name.upper()
if not cat or cat.strip() == "":
if name.startswith("LINKS"):
return "LINKS"
elif name.startswith("OPT"):
return "OPT"
elif name.startswith("NEW"):
return "Content"
return "?"
mapping = {
"On Page Optimization": "OPT",
"Link Building": "LINKS",
"Content Creation": "Content",
}
return mapping.get(cat, cat)
def fmt_tags(t):
return ", ".join(t.tags) if t.tags else ""
print(f"## feb26 To-Do: OPT / LINKS / Content ({len(filtered)} total, showing top 10 oldest)")
print()
print("| # | ID | Keyword/Name | Due | Customer | Tags |")
print("|---|-----|-------------|-----|----------|------|")
for i, t in enumerate(top10, 1):
name = t.name[:55]
tid = t.id
due = fmt_due(t.due_date)
cust = fmt_customer(t)
tags = fmt_tags(t)
print(f"| {i} | {tid} | {name} | {due} | {cust} | {tags} |")
if len(filtered) > 10:
print()
remaining = filtered[10:]
print(f"### Remaining {len(remaining)} tasks:")
print("| # | ID | Keyword/Name | Due | Customer | Tags |")
print("|---|-----|-------------|-----|----------|------|")
for i, t in enumerate(remaining, 11):
name = t.name[:55]
print(f"| {i} | {t.id} | {name} | {fmt_due(t.due_date)} | {fmt_customer(t)} | {fmt_tags(t)} |")
print()
print(f"*{len(filtered)} matching tasks, {len(feb26)} total feb26 tasks, {len(tasks)} total to-do*")

View File

@ -0,0 +1,87 @@
"""Query ClickUp 'to do' tasks tagged feb26 in OPT/LINKS/Content categories."""
import os
import sys
from datetime import datetime, timezone
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), "..", ".env"))
from cheddahbot.clickup import ClickUpClient
TOKEN = os.getenv("CLICKUP_API_TOKEN", "")
SPACE_ID = os.getenv("CLICKUP_SPACE_ID", "")
if not TOKEN or not SPACE_ID:
print("ERROR: CLICKUP_API_TOKEN and CLICKUP_SPACE_ID must be set in .env")
sys.exit(1)
CATEGORIES = {"On Page Optimization", "Content Creation", "Link Building"}
TAG_FILTER = "feb26"
client = ClickUpClient(api_token=TOKEN, workspace_id="", task_type_field_name="Work Category")
print(f"Querying ClickUp space {SPACE_ID} for 'to do' tasks...")
tasks = client.get_tasks_from_space(SPACE_ID, statuses=["to do"])
client.close()
print(f"Total 'to do' tasks found: {len(tasks)}")
# Filter by feb26 tag
tagged = [t for t in tasks if TAG_FILTER in [tag.lower() for tag in t.tags]]
print(f"Tasks with '{TAG_FILTER}' tag: {len(tagged)}")
# Filter by Work Category (OPT / LINKS / Content)
filtered = []
for t in tagged:
cat = (t.custom_fields.get("Work Category") or t.task_type or "").strip()
if cat in CATEGORIES:
filtered.append(t)
if not filtered and tagged:
# Show what categories exist so we can refine
cats_found = set()
for t in tagged:
cats_found.add(t.custom_fields.get("Work Category") or t.task_type or "(none)")
print(f"\nNo tasks matched categories {CATEGORIES}.")
print(f"Work Categories found on feb26-tagged tasks: {cats_found}")
print("\nShowing ALL feb26-tagged tasks instead:\n")
filtered = tagged
# Sort by due date (oldest first), tasks without due date go last
def sort_key(t):
if t.due_date:
return int(t.due_date)
return float("inf")
filtered.sort(key=sort_key)
# Take top 10
top = filtered[:10]
# Format table
def fmt_due(raw_due: str) -> str:
if not raw_due:
return ""
try:
ts = int(raw_due) / 1000
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%m/%d")
except (ValueError, OSError):
return raw_due
def fmt_customer(t) -> str:
return t.custom_fields.get("Customer", "") or ""
print(f"\n{'#':<3} | {'ID':<12} | {'Keyword/Name':<45} | {'Cat':<15} | {'Due':<6} | {'Customer':<20} | Tags")
print("-" * 120)
for i, t in enumerate(top, 1):
tags_str = ", ".join(t.tags)
name = t.name[:45]
cat = t.custom_fields.get("Work Category") or t.task_type or ""
print(f"{i:<3} | {t.id:<12} | {name:<45} | {cat:<15} | {fmt_due(t.due_date):<6} | {fmt_customer(t):<20} | {tags_str}")
print(f"\nTotal shown: {len(top)} of {len(filtered)} matching tasks")

View File

@ -0,0 +1,64 @@
"""Find all Press Release tasks due in February 2026, any status."""
import logging
from datetime import UTC, datetime
logging.basicConfig(level=logging.WARNING)
from cheddahbot.config import load_config
from cheddahbot.clickup import ClickUpClient
import json
config = load_config()
client = ClickUpClient(
api_token=config.clickup.api_token,
workspace_id=config.clickup.workspace_id,
task_type_field_name=config.clickup.task_type_field_name,
)
space_id = config.clickup.space_id
list_ids = client.get_list_ids_from_space(space_id)
field_filter = client.discover_field_filter(
next(iter(list_ids)), config.clickup.task_type_field_name
)
pr_opt_id = field_filter["options"]["Press Release"]
custom_fields_filter = json.dumps(
[{"field_id": field_filter["field_id"], "operator": "ANY", "value": [pr_opt_id]}]
)
# February 2026 window
feb_start = int(datetime(2026, 2, 1, tzinfo=UTC).timestamp() * 1000)
feb_end = int(datetime(2026, 3, 1, tzinfo=UTC).timestamp() * 1000)
# Query with broad statuses, include closed
tasks = client.get_tasks_from_space(
space_id,
custom_fields=custom_fields_filter,
)
# Filter for due in February 2026
feb_prs = []
for t in tasks:
if t.task_type != "Press Release":
continue
if not t.due_date:
continue
try:
due_ms = int(t.due_date)
if feb_start <= due_ms < feb_end:
feb_prs.append(t)
except (ValueError, TypeError):
continue
print(f"\nPress Release tasks due in February 2026: {len(feb_prs)}\n")
for t in feb_prs:
due_dt = datetime.fromtimestamp(int(t.due_date) / 1000, tz=UTC)
due = due_dt.strftime("%Y-%m-%d")
tags_str = ", ".join(t.tags) if t.tags else "(none)"
customer = t.custom_fields.get("Customer", "?")
imsurl = t.custom_fields.get("IMSURL", "")
print(f" [{t.status:20s}] {t.name}")
print(f" id={t.id} due={due} tags={tags_str}")
print(f" customer={customer} imsurl={imsurl or '(none)'}")
print()

View File

@ -0,0 +1,61 @@
"""Find all feb26-tagged Press Release tasks regardless of due date or status."""
import logging
from datetime import UTC, datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%H:%M:%S")
from cheddahbot.config import load_config
from cheddahbot.clickup import ClickUpClient
config = load_config()
client = ClickUpClient(
api_token=config.clickup.api_token,
workspace_id=config.clickup.workspace_id,
task_type_field_name=config.clickup.task_type_field_name,
)
space_id = config.clickup.space_id
# Query ALL statuses (no status filter, no due date filter) but filter by Press Release
list_ids = client.get_list_ids_from_space(space_id)
field_filter = client.discover_field_filter(
next(iter(list_ids)), config.clickup.task_type_field_name
)
import json
pr_opt_id = field_filter["options"]["Press Release"]
custom_fields_filter = json.dumps(
[{"field_id": field_filter["field_id"], "operator": "ANY", "value": [pr_opt_id]}]
)
# Get tasks with NO status filter and NO due date filter
tasks = client.get_tasks_from_space(
space_id,
statuses=["to do", "outline approved", "in progress", "automation underway"],
custom_fields=custom_fields_filter,
)
# Filter for feb26 tag
feb26_tasks = [t for t in tasks if "feb26" in t.tags]
all_pr = [t for t in tasks if t.task_type == "Press Release"]
print(f"\n{'='*70}")
print(f"Total tasks returned: {len(tasks)}")
print(f"Press Release tasks: {len(all_pr)}")
print(f"feb26-tagged PR tasks: {len(feb26_tasks)}")
print(f"{'='*70}\n")
for t in all_pr:
due = ""
if t.due_date:
try:
due_dt = datetime.fromtimestamp(int(t.due_date) / 1000, tz=UTC)
due = due_dt.strftime("%Y-%m-%d")
except (ValueError, TypeError):
due = t.due_date
tags_str = ", ".join(t.tags) if t.tags else "(no tags)"
customer = t.custom_fields.get("Customer", "?")
print(f" [{t.status:20s}] {t.name}")
print(f" id={t.id} due={due or '(none)'} tags={tags_str} customer={customer}")
print()

View File

@ -0,0 +1,102 @@
"""Query ClickUp 'to do' tasks tagged 'feb26' in OPT/LINKS/Content categories."""
from __future__ import annotations
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_root))
from dotenv import load_dotenv
load_dotenv(_root / ".env")
from cheddahbot.clickup import ClickUpClient
API_TOKEN = os.environ.get("CLICKUP_API_TOKEN", "")
SPACE_ID = os.environ.get("CLICKUP_SPACE_ID", "")
if not API_TOKEN:
sys.exit("ERROR: CLICKUP_API_TOKEN env var is required")
if not SPACE_ID:
sys.exit("ERROR: CLICKUP_SPACE_ID env var is required")
# Work Category values to include (case-insensitive partial match)
CATEGORY_FILTERS = ["opt", "link", "content"]
TAG_FILTER = "feb26"
def ms_to_date(ms_str: str) -> str:
"""Convert Unix-ms timestamp string to YYYY-MM-DD."""
if not ms_str:
return ""
try:
ts = int(ms_str) / 1000
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d")
except (ValueError, OSError):
return ms_str
def main() -> None:
client = ClickUpClient(api_token=API_TOKEN, task_type_field_name="Work Category")
print(f"Fetching 'to do' tasks from space {SPACE_ID} ...")
tasks = client.get_tasks_from_overall_lists(SPACE_ID, statuses=["to do"])
print(f"Total 'to do' tasks: {len(tasks)}")
# Filter by feb26 tag
tagged = [t for t in tasks if TAG_FILTER in [tag.lower() for tag in t.tags]]
print(f"Tasks with '{TAG_FILTER}' tag: {len(tagged)}")
# Show all Work Category values for debugging
categories = set()
for t in tagged:
wc = t.custom_fields.get("Work Category", "") or ""
categories.add(wc)
print(f"Work Categories found: {categories}")
# Filter by OPT/LINKS/Content categories
filtered = []
for t in tagged:
wc = str(t.custom_fields.get("Work Category", "") or "").lower()
if any(cat in wc for cat in CATEGORY_FILTERS):
filtered.append(t)
print(f"After category filter (OPT/LINKS/Content): {len(filtered)}")
# Sort by due date (oldest first), tasks with no due date go last
def sort_key(t):
if t.due_date:
try:
return (0, int(t.due_date))
except ValueError:
return (1, 0)
return (2, 0)
filtered.sort(key=sort_key)
# Top 10
top10 = filtered[:10]
# Print table
print(f"\n{'#':>3} | {'ID':>11} | {'Keyword/Name':<45} | {'Due':>10} | {'Customer':<20} | Tags")
print("-" * 120)
for i, t in enumerate(top10, 1):
customer = t.custom_fields.get("Customer", "") or ""
due = ms_to_date(t.due_date)
wc = t.custom_fields.get("Work Category", "") or ""
tags_str = ", ".join(t.tags)
name_display = t.name[:45] if len(t.name) > 45 else t.name
print(f"{i:>3} | {t.id:>11} | {name_display:<45} | {due:>10} | {customer:<20} | {tags_str}")
if not top10:
print(" (no matching tasks found)")
print(f"\n--- {len(filtered)} total matching tasks, showing top {len(top10)} (oldest first) ---")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,149 @@
"""One-time script: rebuild the 'Customer' dropdown custom field in ClickUp.
Steps:
1. Fetch all folders from the PII-Agency-SEO space
2. Filter out non-client folders
3. Create a 'Customer' dropdown field with folder names as options
4. For each client folder, find the 'Overall' list and set Customer on all tasks
Usage:
DRY_RUN=1 uv run python scripts/rebuild_customer_field.py # preview only
uv run python scripts/rebuild_customer_field.py # live run
"""
from __future__ import annotations
import os
import sys
import time
from pathlib import Path
# Allow running from repo root
_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_root))
from dotenv import load_dotenv
load_dotenv(_root / ".env")
from cheddahbot.clickup import ClickUpClient
# ── Config ──────────────────────────────────────────────────────────────────
DRY_RUN = os.environ.get("DRY_RUN", "0") not in ("0", "false", "")
EXCLUDED_FOLDERS = {"SEO Audits", "SEO Projects", "Business Related"}
FIELD_NAME = "Customer"
API_TOKEN = os.environ.get("CLICKUP_API_TOKEN", "")
SPACE_ID = os.environ.get("CLICKUP_SPACE_ID", "")
if not API_TOKEN:
sys.exit("ERROR: CLICKUP_API_TOKEN env var is required")
if not SPACE_ID:
sys.exit("ERROR: CLICKUP_SPACE_ID env var is required")
def main() -> None:
client = ClickUpClient(api_token=API_TOKEN)
# 1. Get folders
print(f"\n{'=' * 60}")
print(f" Rebuild '{FIELD_NAME}' field -- Space {SPACE_ID}")
print(f" Mode: {'DRY RUN' if DRY_RUN else 'LIVE'}")
print(f"{'=' * 60}\n")
folders = client.get_folders(SPACE_ID)
print(f"Found {len(folders)} folders:\n")
client_folders = []
for f in folders:
excluded = f["name"] in EXCLUDED_FOLDERS
marker = " [SKIP]" if excluded else ""
list_names = [lst["name"] for lst in f["lists"]]
print(f" {f['name']}{marker} (lists: {', '.join(list_names) or 'none'})")
if not excluded:
client_folders.append(f)
if not client_folders:
sys.exit("\nNo client folders found -- nothing to do.")
option_names = sorted(f["name"] for f in client_folders)
print(f"\nDropdown options ({len(option_names)}): {', '.join(option_names)}")
# 2. Build a plan: folder → Overall list → tasks
plan: list[dict] = [] # {folder_name, list_id, tasks: [ClickUpTask]}
first_list_id = None
for f in client_folders:
overall = next((lst for lst in f["lists"] if lst["name"] == "Overall"), None)
if overall is None:
print(f"\n WARNING: '{f['name']}' has no 'Overall' list -- skipping task update")
continue
if first_list_id is None:
first_list_id = overall["id"]
tasks = client.get_tasks(overall["id"])
plan.append({"folder_name": f["name"], "list_id": overall["id"], "tasks": tasks})
# 3. Print summary
total_tasks = sum(len(p["tasks"]) for p in plan)
print("\n--- Update Plan ---")
for p in plan:
print(f" {p['folder_name']:30s} -> {len(p['tasks']):3d} tasks in list {p['list_id']}")
print(f" {'TOTAL':30s} -> {total_tasks:3d} tasks")
if DRY_RUN:
print("\n** DRY RUN -- no changes made. Unset DRY_RUN to execute. **\n")
return
if first_list_id is None:
sys.exit("\nNo 'Overall' list found in any client folder -- cannot create field.")
# 4. Create the dropdown field
print(f"\nCreating '{FIELD_NAME}' dropdown on list {first_list_id} ...")
type_config = {
"options": [{"name": name, "color": None} for name in option_names],
}
client.create_custom_field(first_list_id, FIELD_NAME, "drop_down", type_config)
print(" Field created.")
# Brief pause for ClickUp to propagate
time.sleep(2)
# 5. Discover the field UUID + option IDs
print("Discovering field UUID and option IDs ...")
field_info = client.discover_field_filter(first_list_id, FIELD_NAME)
if field_info is None:
sys.exit(f"\nERROR: Could not find '{FIELD_NAME}' field after creation!")
field_id = field_info["field_id"]
option_map = field_info["options"] # {name: uuid}
print(f" Field ID: {field_id}")
print(f" Options: {option_map}")
# 6. Set Customer field on each task
updated = 0
failed = 0
for p in plan:
folder_name = p["folder_name"]
opt_id = option_map.get(folder_name)
if not opt_id:
print(f"\n WARNING: No option ID for '{folder_name}' -- skipping")
continue
print(f"\nUpdating {len(p['tasks'])} tasks in '{folder_name}' ...")
for task in p["tasks"]:
ok = client.set_custom_field_value(task.id, field_id, opt_id)
if ok:
updated += 1
else:
failed += 1
print(f" FAILED: task {task.id} ({task.name})")
# Light rate-limit courtesy
time.sleep(0.15)
print(f"\n{'=' * 60}")
print(f" Done! Updated: {updated} | Failed: {failed}")
print(f"{'=' * 60}\n")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,144 @@
"""Re-run press release pipeline for specific tasks that are missing attachments."""
import logging
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
handlers=[logging.StreamHandler(stream=io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8"))],
)
log = logging.getLogger("pr_rerun")
from cheddahbot.config import load_config
from cheddahbot.db import Database
from cheddahbot.llm import LLMAdapter
from cheddahbot.agent import Agent
from cheddahbot.clickup import ClickUpClient
TASKS_TO_RERUN = [
("86b8ebfk9", "Advanced Industrial highlights medical grade plastic expertise", "Advanced Industrial"),
]
def bootstrap():
config = load_config()
db = Database(config.db_path)
llm = LLMAdapter(
default_model=config.chat_model,
openrouter_key=config.openrouter_api_key,
ollama_url=config.ollama_url,
lmstudio_url=config.lmstudio_url,
)
agent_cfg = config.agents[0] if config.agents else None
agent = Agent(config, db, llm, agent_config=agent_cfg)
try:
from cheddahbot.memory import MemorySystem
scope = agent_cfg.memory_scope if agent_cfg else ""
memory = MemorySystem(config, db, scope=scope)
agent.set_memory(memory)
except Exception as e:
log.warning("Memory not available: %s", e)
from cheddahbot.tools import ToolRegistry
tools = ToolRegistry(config, db, agent)
agent.set_tools(tools)
try:
from cheddahbot.skills import SkillRegistry
skills = SkillRegistry(config.skills_dir)
agent.set_skills_registry(skills)
except Exception as e:
log.warning("Skills not available: %s", e)
return config, db, agent, tools
def run_task(agent, tools, config, client, task_id, task_name, customer):
"""Execute write_press_releases for a specific task."""
# Build args matching the field_mapping from config
args = {
"topic": task_name,
"company_name": customer,
"clickup_task_id": task_id,
}
# Also fetch IMSURL from the task
import httpx as _httpx
resp = _httpx.get(
f"https://api.clickup.com/api/v2/task/{task_id}",
headers={"Authorization": config.clickup.api_token},
timeout=30.0,
)
task_data = resp.json()
for cf in task_data.get("custom_fields", []):
if cf["name"] == "IMSURL":
val = cf.get("value")
if val:
args["url"] = val
elif cf["name"] == "SocialURL":
val = cf.get("value")
if val:
args["branded_url"] = val
log.info("=" * 70)
log.info("EXECUTING: %s", task_name)
log.info(" Task ID: %s", task_id)
log.info(" Customer: %s", customer)
log.info(" Args: %s", {k: v for k, v in args.items() if k != "clickup_task_id"})
log.info("=" * 70)
try:
result = tools.execute("write_press_releases", args)
if result.startswith("Skipped:") or result.startswith("Error:"):
log.error("Task skipped/errored: %s", result[:500])
return False
log.info("Task completed!")
# Print first 500 chars of result
print(f"\n--- Result for {task_name} ---")
print(result[:1000])
print("--- End ---\n")
return True
except Exception as e:
log.error("Task failed: %s", e, exc_info=True)
return False
def main():
log.info("Bootstrapping CheddahBot...")
config, db, agent, tools = bootstrap()
client = ClickUpClient(
api_token=config.clickup.api_token,
workspace_id=config.clickup.workspace_id,
task_type_field_name=config.clickup.task_type_field_name,
)
log.info("Will re-run %d tasks", len(TASKS_TO_RERUN))
results = []
for i, (task_id, name, customer) in enumerate(TASKS_TO_RERUN):
log.info("\n>>> Task %d/%d <<<", i + 1, len(TASKS_TO_RERUN))
success = run_task(agent, tools, config, client, task_id, name, customer)
results.append((name, success))
print(f"\n{'=' * 70}")
print("RESULTS SUMMARY")
print(f"{'=' * 70}")
for name, success in results:
status = "OK" if success else "FAILED"
print(f" [{status}] {name}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,241 @@
"""Run the press-release pipeline for up to N ClickUp tasks.
Usage:
uv run python scripts/run_pr_pipeline.py # discover + execute up to 3
uv run python scripts/run_pr_pipeline.py --dry-run # discover only, don't execute
uv run python scripts/run_pr_pipeline.py --max 1 # execute only 1 task
"""
import argparse
import logging
import sys
from datetime import UTC, datetime
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("pr_pipeline")
# ── Bootstrap CheddahBot (config, db, agent, tools) ─────────────────────
from cheddahbot.config import load_config
from cheddahbot.db import Database
from cheddahbot.llm import LLMAdapter
from cheddahbot.agent import Agent
from cheddahbot.clickup import ClickUpClient
def bootstrap():
"""Set up config, db, agent, and tool registry — same as __main__.py."""
config = load_config()
db = Database(config.db_path)
llm = LLMAdapter(
default_model=config.chat_model,
openrouter_key=config.openrouter_api_key,
ollama_url=config.ollama_url,
lmstudio_url=config.lmstudio_url,
)
agent_cfg = config.agents[0] if config.agents else None
agent = Agent(config, db, llm, agent_config=agent_cfg)
# Memory
try:
from cheddahbot.memory import MemorySystem
scope = agent_cfg.memory_scope if agent_cfg else ""
memory = MemorySystem(config, db, scope=scope)
agent.set_memory(memory)
except Exception as e:
log.warning("Memory not available: %s", e)
# Tools
from cheddahbot.tools import ToolRegistry
tools = ToolRegistry(config, db, agent)
agent.set_tools(tools)
# Skills
try:
from cheddahbot.skills import SkillRegistry
skills = SkillRegistry(config.skills_dir)
agent.set_skills_registry(skills)
except Exception as e:
log.warning("Skills not available: %s", e)
return config, db, agent, tools
def discover_pr_tasks(config):
"""Poll ClickUp for Press Release tasks — same logic as scheduler._poll_clickup()."""
client = ClickUpClient(
api_token=config.clickup.api_token,
workspace_id=config.clickup.workspace_id,
task_type_field_name=config.clickup.task_type_field_name,
)
space_id = config.clickup.space_id
skill_map = config.clickup.skill_map
if not space_id:
log.error("No space_id configured")
return [], client
# Discover field filter (Work Category UUID + options)
list_ids = client.get_list_ids_from_space(space_id)
if not list_ids:
log.error("No lists found in space %s", space_id)
return [], client
first_list = next(iter(list_ids))
field_filter = client.discover_field_filter(
first_list, config.clickup.task_type_field_name
)
# Build custom fields filter for API query
custom_fields_filter = None
if field_filter and field_filter.get("options"):
import json
field_id = field_filter["field_id"]
options = field_filter["options"]
# Only Press Release
pr_opt_id = options.get("Press Release")
if pr_opt_id:
custom_fields_filter = json.dumps(
[{"field_id": field_id, "operator": "ANY", "value": [pr_opt_id]}]
)
log.info("Filtering for Press Release option ID: %s", pr_opt_id)
else:
log.warning("'Press Release' not found in Work Category options: %s", list(options.keys()))
return [], client
# Due date window (3 weeks)
now_ms = int(datetime.now(UTC).timestamp() * 1000)
due_date_lt = now_ms + (3 * 7 * 24 * 60 * 60 * 1000)
tasks = client.get_tasks_from_space(
space_id,
statuses=config.clickup.poll_statuses,
due_date_lt=due_date_lt,
custom_fields=custom_fields_filter,
)
# Client-side filter: must be Press Release + have due date in window
pr_tasks = []
for task in tasks:
if task.task_type != "Press Release":
continue
if not task.due_date:
continue
try:
if int(task.due_date) > due_date_lt:
continue
except (ValueError, TypeError):
continue
pr_tasks.append(task)
return pr_tasks, client
def execute_task(agent, tools, config, client, task):
"""Execute a single PR task — same logic as scheduler._execute_task()."""
skill_map = config.clickup.skill_map
mapping = skill_map.get("Press Release", {})
tool_name = mapping.get("tool", "write_press_releases")
task_id = task.id
# Build tool args from field mapping
field_mapping = mapping.get("field_mapping", {})
args = {}
for tool_param, source in field_mapping.items():
if source == "task_name":
args[tool_param] = task.name
elif source == "task_description":
args[tool_param] = task.custom_fields.get("description", "")
else:
args[tool_param] = task.custom_fields.get(source, "")
args["clickup_task_id"] = task_id
log.info("=" * 70)
log.info("EXECUTING: %s", task.name)
log.info(" Task ID: %s", task_id)
log.info(" Tool: %s", tool_name)
log.info(" Args: %s", {k: v for k, v in args.items() if k != "clickup_task_id"})
log.info("=" * 70)
# Move to "automation underway"
client.update_task_status(task_id, config.clickup.automation_status)
try:
result = tools.execute(tool_name, args)
if result.startswith("Skipped:") or result.startswith("Error:"):
log.error("Task skipped/errored: %s", result[:500])
client.add_comment(
task_id,
f"⚠️ CheddahBot could not execute this task.\n\n{result[:2000]}",
)
client.update_task_status(task_id, config.clickup.error_status)
return False
log.info("Task completed successfully!")
log.info("Result preview:\n%s", result[:1000])
return True
except Exception as e:
log.error("Task failed with exception: %s", e, exc_info=True)
client.add_comment(
task_id,
f"❌ CheddahBot failed to complete this task.\n\nError: {str(e)[:2000]}",
)
client.update_task_status(task_id, config.clickup.error_status)
return False
def main():
parser = argparse.ArgumentParser(description="Run PR pipeline from ClickUp")
parser.add_argument("--dry-run", action="store_true", help="Discover only, don't execute")
parser.add_argument("--max", type=int, default=3, help="Max tasks to execute (default: 3)")
args = parser.parse_args()
log.info("Bootstrapping CheddahBot...")
config, db, agent, tools = bootstrap()
log.info("Polling ClickUp for Press Release tasks...")
pr_tasks, client = discover_pr_tasks(config)
if not pr_tasks:
log.info("No Press Release tasks found in statuses %s", config.clickup.poll_statuses)
return
log.info("Found %d Press Release task(s):", len(pr_tasks))
for i, task in enumerate(pr_tasks):
status_str = f"status={task.status}" if hasattr(task, "status") else ""
log.info(" %d. %s (id=%s) %s", i + 1, task.name, task.id, status_str)
log.info(" Custom fields: %s", task.custom_fields)
if args.dry_run:
log.info("Dry run — not executing. Use without --dry-run to execute.")
return
# Execute up to --max tasks
to_run = pr_tasks[: args.max]
log.info("Will execute %d task(s) (max=%d)", len(to_run), args.max)
results = []
for i, task in enumerate(to_run):
log.info("\n>>> Task %d/%d <<<", i + 1, len(to_run))
success = execute_task(agent, tools, config, client, task)
results.append((task.name, success))
log.info("\n" + "=" * 70)
log.info("RESULTS SUMMARY")
log.info("=" * 70)
for name, success in results:
status = "OK" if success else "FAILED"
log.info(" [%s] %s", status, name)
if __name__ == "__main__":
main()

3
start.sh 100644
View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
cd "$(dirname "$0")"
exec uv run python -m cheddahbot

View File

@ -16,6 +16,7 @@ class FakeTask:
id: str = "fake_id"
name: str = ""
task_type: str = ""
status: str = "running cora"
custom_fields: dict = field(default_factory=dict)