"""Poll Gmail for client replies and update ClickUp tasks. Searches for email threads containing "Ref: CU-" where the latest message is from someone other than the bot. Posts the reply text as a ClickUp comment and moves the task to "After Client Feedback" status. Usage: uv run python scripts/check_gmail_replies.py # normal run uv run python scripts/check_gmail_replies.py --dry-run # preview only uv run python scripts/check_gmail_replies.py --since 48 # look back 48 hours """ from __future__ import annotations import argparse import base64 import json import logging import os import re import sys from datetime import UTC, datetime, timedelta from email.utils import parseaddr from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from dotenv import load_dotenv load_dotenv() from googleapiclient.discovery import build # noqa: E402 from cheddahbot.clickup import ClickUpClient # noqa: E402 from cheddahbot.google_auth import get_credentials # noqa: E402 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") log = logging.getLogger(__name__) STATE_FILE = Path("data/gmail_sync_state.json") REF_PATTERN = re.compile(r"Ref:\s*CU-(\w+)") AFTER_FEEDBACK_STATUS = "after client feedback" def _load_state() -> dict: """Load sync state from disk.""" if STATE_FILE.exists(): try: return json.loads(STATE_FILE.read_text()) except (json.JSONDecodeError, OSError): log.warning("Corrupt state file, starting fresh") return {} def _save_state(state: dict) -> None: """Persist sync state to disk.""" STATE_FILE.parent.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text(json.dumps(state, indent=2)) def _get_message_body(payload: dict) -> str: """Extract plain text body from a Gmail message payload.""" # Simple single-part message if payload.get("mimeType") == "text/plain" and "data" in payload.get("body", {}): return base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace") # Multipart message -- find the text/plain part for part in payload.get("parts", []): if part.get("mimeType") == "text/plain" and "data" in part.get("body", {}): return base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace") # Nested multipart if part.get("parts"): result = _get_message_body(part) if result: return result return "" def _extract_reply_text(body: str) -> str: """Extract just the new reply content (above the quoted text). Looks for common reply markers like "On ... wrote:" and takes everything above. Falls back to the full body if no marker is found. """ # Common reply markers markers = [ r"\nOn .+ wrote:\s*\n", # Gmail-style r"\n-{3,}\s*Original Message\s*-{3,}", # Outlook-style r"\nFrom:\s+.+\nSent:\s+", # Outlook-style ] for marker in markers: match = re.search(marker, body, re.IGNORECASE) if match: reply = body[: match.start()].strip() if reply: return reply return body.strip() def main(): parser = argparse.ArgumentParser(description="Check Gmail for client replies") parser.add_argument("--dry-run", action="store_true", help="Preview without making changes") parser.add_argument("--since", type=int, help="Look back N hours (overrides state file)") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Determine lookback time state = _load_state() if args.since: since = datetime.now(UTC) - timedelta(hours=args.since) elif "last_check" in state: since = datetime.fromisoformat(state["last_check"]) else: since = datetime.now(UTC) - timedelta(hours=24) processed_ids = set(state.get("processed_message_ids", [])) since_str = since.strftime("%Y/%m/%d") log.info("Checking for replies since %s", since.isoformat()) # Authenticate and build Gmail service creds = get_credentials() gmail = build("gmail", "v1", credentials=creds) # Get the authenticated user's email for filtering out our own messages profile = gmail.users().getProfile(userId="me").execute() my_email = profile.get("emailAddress", "").lower() log.info("Authenticated as %s", my_email) # Search for threads with Ref: CU- pattern query = 'in:inbox "Ref: CU-" after:%s' % since_str log.info("Gmail search query: %s", query) threads_result = gmail.users().threads().list(userId="me", q=query).execute() threads = threads_result.get("threads", []) log.info("Found %d candidate threads", len(threads)) # Initialize ClickUp client clickup_token = os.environ.get("CLICKUP_API_TOKEN", "") cu_client = ClickUpClient(api_token=clickup_token) if clickup_token else None stats = {"replies_found": 0, "comments_posted": 0, "statuses_updated": 0, "errors": 0} for thread_info in threads: thread_id = thread_info["id"] thread = gmail.users().threads().get(userId="me", id=thread_id).execute() messages = thread.get("messages", []) if not messages: continue # Get the latest message in the thread latest = messages[-1] msg_id = latest["id"] # Skip if already processed if msg_id in processed_ids: continue # Check if the latest message is from someone else (not us) headers = {h["name"].lower(): h["value"] for h in latest.get("payload", {}).get("headers", [])} from_addr = parseaddr(headers.get("from", ""))[1].lower() if from_addr == my_email: continue # Our own message, skip # Extract the body and find the Ref tag body = _get_message_body(latest.get("payload", {})) if not body: # Check earlier messages in thread for the Ref tag for msg in reversed(messages[:-1]): body_check = _get_message_body(msg.get("payload", {})) ref_match = REF_PATTERN.search(body_check) if ref_match: body = _get_message_body(latest.get("payload", {})) task_id = ref_match.group(1) break else: continue else: # Search the entire thread for the Ref tag (might be in quoted text or earlier message) task_id = None for msg in messages: msg_body = _get_message_body(msg.get("payload", {})) ref_match = REF_PATTERN.search(msg_body) if ref_match: task_id = ref_match.group(1) break if not task_id: continue reply_text = _extract_reply_text(body) reply_date = headers.get("date", "unknown") subject = headers.get("subject", "(no subject)") stats["replies_found"] += 1 print("\n--- Reply Found ---") print(" From: %s" % from_addr) print(" Subject: %s" % subject) print(" Date: %s" % reply_date) print(" Task: CU-%s" % task_id) print(" Reply preview: %s..." % reply_text[:200]) if not args.dry_run and cu_client: # Post comment to ClickUp try: comment = "Email reply from %s (%s):\n\n%s" % (from_addr, reply_date, reply_text) cu_client.add_comment(task_id, comment) stats["comments_posted"] += 1 print(" -> Comment posted to ClickUp") except Exception as e: log.error("Failed to post comment for task %s: %s", task_id, e) stats["errors"] += 1 continue # Don't update status or mark processed if comment failed # Update task status try: cu_client.update_task_status(task_id, AFTER_FEEDBACK_STATUS) stats["statuses_updated"] += 1 print(" -> Status set to '%s'" % AFTER_FEEDBACK_STATUS) except Exception as e: log.error("Failed to update status for task %s: %s", task_id, e) stats["errors"] += 1 # Mark as processed processed_ids.add(msg_id) # Save state if not args.dry_run: state["last_check"] = datetime.now(UTC).isoformat() state["processed_message_ids"] = list(processed_ids)[-500:] # Keep last 500 _save_state(state) print("\n--- Summary ---") print(" Replies found: %d" % stats["replies_found"]) if not args.dry_run: print(" Comments posted: %d" % stats["comments_posted"]) print(" Statuses updated: %d" % stats["statuses_updated"]) print(" Errors: %d" % stats["errors"]) else: print(" (dry-run -- no changes made)") if cu_client: cu_client.close() if __name__ == "__main__": main()