CheddahBot/clickup_runner/__main__.py

642 lines
19 KiB
Python

"""ClickUp + Claude Code automation runner -- entry point.
Usage:
uv run python -m clickup_runner
"""
from __future__ import annotations
import logging
import signal
import sys
import time
from datetime import datetime, timezone
from .autocora import archive_result, scan_results, submit_job
from .claude_runner import (
RunResult,
build_prompt,
copy_to_nas,
notify,
read_skill_file,
run_claude,
)
from .clickup_client import ClickUpClient, ClickUpTask
from .config import Config, load_config
from .skill_map import SkillRoute, get_route, get_supported_task_types, get_valid_stages
from .state import StateDB
log = logging.getLogger("clickup_runner")
# Flag for graceful shutdown
_shutdown = False
def _handle_signal(signum, frame):
global _shutdown
log.info("Received signal %d -- shutting down after current cycle", signum)
_shutdown = True
def _setup_logging():
"""Configure logging: console + file."""
fmt = logging.Formatter(
"[%(asctime)s] %(levelname)-7s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
console = logging.StreamHandler(sys.stdout)
console.setFormatter(fmt)
console.setLevel(logging.INFO)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(console)
# File handler for persistent logs
try:
from pathlib import Path
log_dir = Path(__file__).resolve().parent.parent / "logs"
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(
log_dir / "clickup_runner.log", encoding="utf-8"
)
file_handler.setFormatter(fmt)
file_handler.setLevel(logging.DEBUG)
root.addHandler(file_handler)
except Exception as e:
log.warning("Could not set up file logging: %s", e)
def _due_date_cutoff_ms() -> int:
"""Return end-of-today as Unix milliseconds for due_date_lt filter."""
now = datetime.now(timezone.utc)
end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
return int(end_of_day.timestamp() * 1000)
def _is_due_today_or_earlier(task: ClickUpTask) -> bool:
"""Check if a task's due date is today or earlier."""
if not task.due_date:
return False
try:
due_ms = int(task.due_date)
cutoff_ms = _due_date_cutoff_ms()
return due_ms <= cutoff_ms
except (ValueError, TypeError):
return False
def poll_cycle(
client: ClickUpClient,
cfg: Config,
db: StateDB,
) -> int:
"""Run one poll cycle. Returns the number of tasks dispatched."""
space_id = cfg.clickup.space_id
if not space_id:
log.error("No space_id configured -- skipping poll cycle")
return 0
# Check for completed AutoCora jobs before dispatching new tasks
_check_autocora_results(client, cfg, db)
# Fetch all tasks from Overall lists with due date <= today
cutoff_ms = _due_date_cutoff_ms()
tasks = client.get_tasks_from_overall_lists(space_id, due_date_lt=cutoff_ms)
dispatched = 0
for task in tasks:
# 1. Check "Delegate to Claude" checkbox
if not client.is_checkbox_checked(task, cfg.clickup.delegate_field_name):
continue
# 2. Verify due date <= today
if not _is_due_today_or_earlier(task):
continue
# 3. Read task type and stage
task_type = task.task_type
stage = client.get_stage(task, cfg.clickup.stage_field_name)
log.info(
"Found delegated task: %s (id=%s, type=%s, stage=%s)",
task.name,
task.id,
task_type,
stage,
)
# 4. Look up skill route
if not task_type:
_handle_no_mapping(
client, cfg, task,
"Task has no Work Category set. "
"Set the Work Category field, then re-check Delegate to Claude.",
)
continue
if not stage:
_handle_no_mapping(
client, cfg, task,
"Task has no Stage set. "
"Valid stages for %s: %s. "
"Set the Stage field, then re-check Delegate to Claude."
% (task_type, ", ".join(get_valid_stages(task_type)) or "none"),
)
continue
# 5. Check for .xlsx attachment on run_cora stage
route = get_route(task_type, stage)
if route and route.handler == "autocora":
# If .xlsx is already attached, skip Cora and advance
attachments = client.get_task_attachments(task.id)
task.attachments = attachments
if task.has_xlsx_attachment():
log.info(
"Task %s has .xlsx attached -- skipping run_cora, advancing to %s",
task.id,
route.next_stage,
)
client.set_stage(
task.id,
task.list_id,
route.next_stage,
cfg.clickup.stage_field_name,
)
# Re-read stage and re-route
stage = route.next_stage
route = get_route(task_type, stage)
if route is None:
valid = get_valid_stages(task_type)
if not valid:
msg = (
"Task type '%s' is not supported. "
"Supported types: %s. "
"Fix the Work Category field, then re-check Delegate to Claude."
% (task_type, ", ".join(get_supported_task_types()))
)
else:
msg = (
"Stage '%s' is not valid for task type '%s'. "
"Valid stages: %s. "
"Fix the Stage field, then re-check Delegate to Claude."
% (stage, task_type, ", ".join(valid))
)
_handle_no_mapping(client, cfg, task, msg)
continue
# 6. Dispatch
log.info(
"Dispatching task %s: type=%s, stage=%s, handler=%s",
task.id,
task_type,
stage,
route.handler,
)
run_id = db.log_run_start(task.id, task.name, task_type, stage)
if route.handler == "autocora":
_dispatch_autocora(client, cfg, db, task, route, run_id)
else:
_dispatch_claude(client, cfg, db, task, route, run_id)
dispatched += 1
return dispatched
def _handle_no_mapping(
client: ClickUpClient,
cfg: Config,
task: ClickUpTask,
message: str,
):
"""Handle a task that can't be routed: post comment, set error, uncheck."""
comment = "[ERROR] Cannot process task\n--\n%s" % message
client.add_comment(task.id, comment)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.error_field_name, True
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
log.warning("Task %s: %s", task.id, message)
def _check_autocora_results(
client: ClickUpClient,
cfg: Config,
db: StateDB,
):
"""Poll for completed AutoCora jobs and update ClickUp accordingly."""
results = scan_results(cfg.autocora.results_dir)
if not results:
return
log.info("Found %d AutoCora result(s) to process", len(results))
for result in results:
# Look up the pending job in the state DB
job_data = db.kv_get_json("autocora:job:%s" % result.job_id)
# Also check task_ids from the result file itself
task_ids = result.task_ids
if job_data:
# Prefer state DB data -- it always has the task_id
task_ids = [job_data["task_id"]]
if not task_ids:
log.warning(
"Result %s has no task_ids and no matching state DB entry -- skipping",
result.job_id,
)
archive_result(result)
continue
for task_id in task_ids:
if result.status == "SUCCESS":
_handle_autocora_success(client, cfg, db, task_id, result, job_data)
else:
_handle_autocora_failure(client, cfg, db, task_id, result, job_data)
# Clean up state DB entry
if job_data:
db.kv_delete("autocora:job:%s" % result.job_id)
archive_result(result)
def _handle_autocora_success(
client: ClickUpClient,
cfg: Config,
db: StateDB,
task_id: str,
result,
job_data: dict | None,
):
"""Handle a successful AutoCora result for one task."""
keyword = result.keyword or (job_data or {}).get("keyword", "unknown")
# Advance stage -- need list_id from task or job_data
try:
task = client.get_task(task_id)
except Exception as e:
log.error("Failed to fetch task %s for AutoCora result: %s", task_id, e)
return
# Look up the route to get next_stage
task_type = task.task_type
stage = client.get_stage(task, cfg.clickup.stage_field_name)
route = get_route(task_type, stage)
if route:
client.set_stage(
task_id, task.list_id, route.next_stage, cfg.clickup.stage_field_name
)
next_stage = route.next_stage
else:
# Fallback -- just note it in the comment
next_stage = "(unknown)"
client.update_task_status(task_id, cfg.clickup.review_status)
client.add_comment(
task_id,
"Cora report generated for \"%s\". Stage advanced to %s.\n"
"Review the .xlsx in %s, then re-check Delegate to Claude for the next stage."
% (keyword, next_stage, cfg.autocora.xlsx_dir),
)
client.set_checkbox(
task_id, task.list_id, cfg.clickup.error_field_name, False
)
# Finish the run log if we have a run_id
run_id = (job_data or {}).get("run_id")
if run_id:
db.log_run_finish(run_id, "completed", result="Cora report ready")
notify(cfg, "Cora done: %s" % keyword, "Task %s ready for review" % task_id)
log.info("AutoCora SUCCESS for task %s (keyword=%s)", task_id, keyword)
def _handle_autocora_failure(
client: ClickUpClient,
cfg: Config,
db: StateDB,
task_id: str,
result,
job_data: dict | None,
):
"""Handle a failed AutoCora result for one task."""
keyword = result.keyword or (job_data or {}).get("keyword", "unknown")
reason = result.reason or "Unknown error"
try:
task = client.get_task(task_id)
except Exception as e:
log.error("Failed to fetch task %s for AutoCora result: %s", task_id, e)
return
comment = (
"[ERROR] Cora report failed for keyword: \"%s\"\n"
"--\n"
"What failed: %s\n"
"\n"
"How to fix: Check the AutoCora worker logs, fix the issue, "
"then re-check Delegate to Claude."
) % (keyword, reason)
client.add_comment(task_id, comment)
client.set_checkbox(
task_id, task.list_id, cfg.clickup.error_field_name, True
)
client.update_task_status(task_id, cfg.clickup.review_status)
run_id = (job_data or {}).get("run_id")
if run_id:
db.log_run_finish(run_id, "failed", error="Cora failed: %s" % reason)
notify(
cfg,
"Cora FAILED: %s" % keyword,
"Task %s -- %s" % (task_id, reason),
is_error=True,
)
log.error("AutoCora FAILURE for task %s: %s", task_id, reason)
def _dispatch_autocora(
client: ClickUpClient,
cfg: Config,
db: StateDB,
task: ClickUpTask,
route: SkillRoute,
run_id: int,
):
"""Submit an AutoCora job for a task."""
keyword = task.get_field_value("Keyword") or ""
url = task.get_field_value("IMSURL") or ""
if not keyword:
_handle_no_mapping(
client, cfg, task,
"Task has no Keyword field set. "
"Set the Keyword custom field, then re-check Delegate to Claude.",
)
db.log_run_finish(run_id, "failed", error="Missing Keyword field")
return
# 1. Set status to "ai working"
client.update_task_status(task.id, cfg.clickup.ai_working_status)
# 2. Submit the job to the NAS queue
job_id = submit_job(keyword, url, task.id, cfg.autocora.jobs_dir)
if not job_id:
_handle_dispatch_error(
client, cfg, db, task, run_id,
error="Failed to write AutoCora job file to %s" % cfg.autocora.jobs_dir,
fix="Check that the NAS is mounted and accessible, "
"then re-check Delegate to Claude.",
)
return
# 3. Store job metadata in state DB for result polling
db.kv_set_json("autocora:job:%s" % job_id, {
"task_id": task.id,
"task_name": task.name,
"keyword": keyword,
"url": url,
"run_id": run_id,
})
# 4. Post comment + uncheck delegate
client.add_comment(
task.id,
"Cora job submitted for keyword: \"%s\" (job: %s).\n"
"The runner will check for results automatically."
% (keyword, job_id),
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
# 5. Log as submitted (not completed -- that happens when results arrive)
db.log_run_finish(run_id, "submitted", result="Job: %s" % job_id)
notify(cfg, "Cora submitted: %s" % keyword, "Task: %s" % task.name)
log.info(
"AutoCora job submitted: %s (task=%s, keyword=%s)",
job_id, task.id, keyword,
)
def _dispatch_claude(
client: ClickUpClient,
cfg: Config,
db: StateDB,
task: ClickUpTask,
route: SkillRoute,
run_id: int,
):
"""Run Claude Code headless for a task."""
# 1. Set status to "ai working"
client.update_task_status(task.id, cfg.clickup.ai_working_status)
# 2. Read skill file
try:
skill_content = read_skill_file(route, cfg.skills_dir)
except FileNotFoundError as e:
_handle_dispatch_error(
client, cfg, db, task, run_id,
error=str(e),
fix="Create the skill file at skills/%s, then re-check Delegate to Claude."
% route.skill_file,
)
return
# 3. Gather .xlsx attachment URLs for the prompt
xlsx_urls = [
a.get("url", "")
for a in task.attachments
if a.get("title", "").lower().endswith(".xlsx")
or a.get("url", "").lower().endswith(".xlsx")
]
# 4. Build prompt
prompt = build_prompt(task, route, skill_content, xlsx_urls or None)
# 5. Run Claude
log.info("Starting Claude for task %s (%s)", task.id, task.name)
result = run_claude(prompt, route, cfg)
if not result.success:
_handle_dispatch_error(
client, cfg, db, task, run_id,
error=result.error,
fix="Check logs/clickup_runner.log for details. "
"Fix the issue, then re-check Delegate to Claude.",
)
# Clean up temp dir
_cleanup_work_dir(result.work_dir)
return
# 6. Upload output files to ClickUp
uploaded = 0
for f in result.output_files:
if client.upload_attachment(task.id, f):
uploaded += 1
# 7. Copy to NAS (best-effort)
customer = task.get_field_value("Client") or ""
if customer and cfg.nas.generated_dir:
copy_to_nas(result.output_files, customer, cfg.nas.generated_dir)
# 8. Advance stage + status
client.set_stage(
task.id, task.list_id, route.next_stage, cfg.clickup.stage_field_name
)
client.update_task_status(task.id, route.next_status)
# 9. Post success comment
summary = "Stage complete. %d file(s) attached." % uploaded
if result.output:
# Include first 500 chars of Claude's output as context
truncated = result.output[:500]
if len(result.output) > 500:
truncated += "..."
summary += "\n\n---\nClaude output:\n%s" % truncated
client.add_comment(task.id, summary)
# 10. Uncheck delegate + clear error
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.error_field_name, False
)
# 11. Log success
db.log_run_finish(
run_id, "completed",
result="%d files uploaded" % uploaded,
)
# 12. Notify
notify(cfg, "Task complete: %s" % task.name, summary)
log.info(
"Task %s completed: stage -> %s, %d file(s) uploaded",
task.id, route.next_stage, uploaded,
)
# 13. Clean up temp dir
_cleanup_work_dir(result.work_dir)
def _handle_dispatch_error(
client: ClickUpClient,
cfg: Config,
db: StateDB,
task: ClickUpTask,
run_id: int,
error: str,
fix: str,
):
"""Handle a failed Claude dispatch: set error state, comment, notify."""
comment = (
"[ERROR] Claude processing failed\n"
"--\n"
"What failed: %s\n"
"\n"
"How to fix: %s"
) % (error, fix)
client.add_comment(task.id, comment)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.error_field_name, True
)
client.set_checkbox(
task.id, task.list_id, cfg.clickup.delegate_field_name, False
)
client.update_task_status(task.id, cfg.clickup.review_status)
db.log_run_finish(run_id, "failed", error=error)
notify(
cfg,
"FAILED: %s" % task.name,
"Error: %s\nFix: %s" % (error, fix),
is_error=True,
)
log.error("Task %s failed: %s", task.id, error)
def _cleanup_work_dir(work_dir):
"""Remove temporary work directory."""
if work_dir is None:
return
try:
import shutil
shutil.rmtree(str(work_dir), ignore_errors=True)
except Exception:
pass
def main():
_setup_logging()
log.info("ClickUp Runner starting up")
cfg = load_config()
if not cfg.clickup.api_token:
log.error("CLICKUP_API_TOKEN not set -- exiting")
sys.exit(1)
if not cfg.clickup.space_id:
log.error("CLICKUP_SPACE_ID not set -- exiting")
sys.exit(1)
client = ClickUpClient(
api_token=cfg.clickup.api_token,
task_type_field_name=cfg.clickup.task_type_field_name,
)
db = StateDB(cfg.db_path)
# Graceful shutdown on SIGINT/SIGTERM
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
log.info(
"Runner ready. Polling every %ds. Space: %s",
cfg.runner.poll_interval_seconds,
cfg.clickup.space_id,
)
try:
while not _shutdown:
try:
count = poll_cycle(client, cfg, db)
if count:
log.info("Dispatched %d task(s) this cycle", count)
except Exception:
log.exception("Error in poll cycle")
# Sleep in small increments so we can catch shutdown signal
for _ in range(cfg.runner.poll_interval_seconds):
if _shutdown:
break
time.sleep(1)
finally:
client.close()
log.info("ClickUp Runner shut down")
if __name__ == "__main__":
main()