Migrate ClickUp Customer field to space-level Client field
Replaces all "Customer" field lookups with "Client" to match the new space-wide dropdown, eliminating the 20+ duplicate list-level fields. Includes migration script that populated 400 active tasks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>fix/customer-field-migration
parent
85d9fc7a1f
commit
6250918e5e
|
|
@ -125,7 +125,7 @@ async def get_tasks_by_company():
|
||||||
data = await get_tasks()
|
data = await get_tasks()
|
||||||
by_company: dict[str, list] = {}
|
by_company: dict[str, list] = {}
|
||||||
for task in data.get("tasks", []):
|
for task in data.get("tasks", []):
|
||||||
company = task["custom_fields"].get("Customer") or "Unassigned"
|
company = task["custom_fields"].get("Client") or "Unassigned"
|
||||||
by_company.setdefault(company, []).append(task)
|
by_company.setdefault(company, []).append(task)
|
||||||
|
|
||||||
# Sort companies by task count descending
|
# Sort companies by task count descending
|
||||||
|
|
@ -238,7 +238,7 @@ async def get_link_building_tasks():
|
||||||
in_progress_not_started.append(t)
|
in_progress_not_started.append(t)
|
||||||
by_company: dict[str, list] = {}
|
by_company: dict[str, list] = {}
|
||||||
for task in active_lb:
|
for task in active_lb:
|
||||||
company = task["custom_fields"].get("Customer") or "Unassigned"
|
company = task["custom_fields"].get("Client") or "Unassigned"
|
||||||
by_company.setdefault(company, []).append(task)
|
by_company.setdefault(company, []).append(task)
|
||||||
|
|
||||||
result = {
|
result = {
|
||||||
|
|
@ -320,7 +320,7 @@ async def get_need_cora_tasks():
|
||||||
if kw_lower not in by_keyword:
|
if kw_lower not in by_keyword:
|
||||||
by_keyword[kw_lower] = {
|
by_keyword[kw_lower] = {
|
||||||
"keyword": kw,
|
"keyword": kw,
|
||||||
"company": t["custom_fields"].get("Customer") or "Unassigned",
|
"company": t["custom_fields"].get("Client") or "Unassigned",
|
||||||
"due_date": t.get("due_date"),
|
"due_date": t.get("due_date"),
|
||||||
"tasks": [],
|
"tasks": [],
|
||||||
}
|
}
|
||||||
|
|
@ -367,7 +367,7 @@ async def get_press_release_tasks():
|
||||||
|
|
||||||
by_company: dict[str, list] = {}
|
by_company: dict[str, list] = {}
|
||||||
for task in pr_tasks:
|
for task in pr_tasks:
|
||||||
company = task["custom_fields"].get("Customer") or "Unassigned"
|
company = task["custom_fields"].get("Client") or "Unassigned"
|
||||||
by_company.setdefault(company, []).append(task)
|
by_company.setdefault(company, []).append(task)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|
|
||||||
|
|
@ -1282,7 +1282,7 @@ class Scheduler:
|
||||||
"""Group tasks by their Customer custom field."""
|
"""Group tasks by their Customer custom field."""
|
||||||
groups: dict[str, list] = {}
|
groups: dict[str, list] = {}
|
||||||
for t in tasks:
|
for t in tasks:
|
||||||
customer = t.custom_fields.get("Customer", "") or "Unknown"
|
customer = t.custom_fields.get("Client", "") or "Unknown"
|
||||||
groups.setdefault(str(customer), []).append(t)
|
groups.setdefault(str(customer), []).append(t)
|
||||||
return groups
|
return groups
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -88,7 +88,7 @@ def _find_clickup_task(ctx: dict, company_name: str) -> str:
|
||||||
if task.task_type != "Press Release":
|
if task.task_type != "Press Release":
|
||||||
continue
|
continue
|
||||||
|
|
||||||
client_field = task.custom_fields.get("Customer", "")
|
client_field = task.custom_fields.get("Client", "")
|
||||||
if not (
|
if not (
|
||||||
_fuzzy_company_match(company_name, task.name)
|
_fuzzy_company_match(company_name, task.name)
|
||||||
or _fuzzy_company_match(company_name, client_field)
|
or _fuzzy_company_match(company_name, client_field)
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ clickup:
|
||||||
required_fields: [topic, company_name, target_url]
|
required_fields: [topic, company_name, target_url]
|
||||||
field_mapping:
|
field_mapping:
|
||||||
topic: "PR Topic"
|
topic: "PR Topic"
|
||||||
company_name: "Customer"
|
company_name: "Client"
|
||||||
target_url: "IMSURL"
|
target_url: "IMSURL"
|
||||||
branded_url: "SocialURL"
|
branded_url: "SocialURL"
|
||||||
"On Page Optimization":
|
"On Page Optimization":
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,161 @@
|
||||||
|
"""Migrate ClickUp 'Customer' (list-level) → 'Client' (space-level) field.
|
||||||
|
|
||||||
|
This script does NOT create the field — you must create the space-level "Client"
|
||||||
|
dropdown manually in ClickUp UI first, using the company names this script prints.
|
||||||
|
|
||||||
|
Steps:
|
||||||
|
1. Fetch all folders, filter to those with an 'Overall' list
|
||||||
|
2. Print sorted company names for dropdown creation
|
||||||
|
3. Pause for you to create the field in ClickUp UI
|
||||||
|
4. Discover the new 'Client' field's UUID + option IDs
|
||||||
|
5. Set 'Client' on every active task (folder name as value)
|
||||||
|
6. Report results
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
DRY_RUN=1 uv run python scripts/migrate_client_field.py # preview only
|
||||||
|
uv run python scripts/migrate_client_field.py # live run
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Allow running from repo root
|
||||||
|
_root = Path(__file__).resolve().parent.parent
|
||||||
|
sys.path.insert(0, str(_root))
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
load_dotenv(_root / ".env")
|
||||||
|
|
||||||
|
from cheddahbot.clickup import ClickUpClient
|
||||||
|
|
||||||
|
# ── Config ──────────────────────────────────────────────────────────────────
|
||||||
|
DRY_RUN = os.environ.get("DRY_RUN", "0") not in ("0", "false", "")
|
||||||
|
NEW_FIELD_NAME = "Client"
|
||||||
|
|
||||||
|
API_TOKEN = os.environ.get("CLICKUP_API_TOKEN", "")
|
||||||
|
SPACE_ID = os.environ.get("CLICKUP_SPACE_ID", "")
|
||||||
|
|
||||||
|
if not API_TOKEN:
|
||||||
|
sys.exit("ERROR: CLICKUP_API_TOKEN env var is required")
|
||||||
|
if not SPACE_ID:
|
||||||
|
sys.exit("ERROR: CLICKUP_SPACE_ID env var is required")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
client = ClickUpClient(api_token=API_TOKEN)
|
||||||
|
|
||||||
|
# 1. Get folders, filter to those with an Overall list
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print(f" Migrate to '{NEW_FIELD_NAME}' field -- Space {SPACE_ID}")
|
||||||
|
print(f" Mode: {'DRY RUN' if DRY_RUN else 'LIVE'}")
|
||||||
|
print(f"{'=' * 60}\n")
|
||||||
|
|
||||||
|
folders = client.get_folders(SPACE_ID)
|
||||||
|
print(f"Found {len(folders)} folders:\n")
|
||||||
|
|
||||||
|
client_folders = []
|
||||||
|
for f in folders:
|
||||||
|
overall = next(
|
||||||
|
(lst for lst in f["lists"] if lst["name"].lower() == "overall"), None
|
||||||
|
)
|
||||||
|
if overall:
|
||||||
|
print(f" {f['name']:35s} Overall list: {overall['id']}")
|
||||||
|
client_folders.append({"name": f["name"], "overall_id": overall["id"]})
|
||||||
|
else:
|
||||||
|
print(f" {f['name']:35s} [SKIP - no Overall list]")
|
||||||
|
|
||||||
|
if not client_folders:
|
||||||
|
sys.exit("\nNo client folders with Overall lists found.")
|
||||||
|
|
||||||
|
# 2. Print company names for dropdown creation
|
||||||
|
option_names = sorted(cf["name"] for cf in client_folders)
|
||||||
|
print(f"\n--- Dropdown options for '{NEW_FIELD_NAME}' ({len(option_names)}) ---")
|
||||||
|
for name in option_names:
|
||||||
|
print(f" {name}")
|
||||||
|
|
||||||
|
# 3. Build plan: fetch active tasks per folder
|
||||||
|
print("\nFetching active tasks from Overall lists ...")
|
||||||
|
plan: list[dict] = []
|
||||||
|
for cf in client_folders:
|
||||||
|
tasks = client.get_tasks(cf["overall_id"], include_closed=False)
|
||||||
|
plan.append({
|
||||||
|
"folder_name": cf["name"],
|
||||||
|
"list_id": cf["overall_id"],
|
||||||
|
"tasks": tasks,
|
||||||
|
})
|
||||||
|
|
||||||
|
total_tasks = sum(len(p["tasks"]) for p in plan)
|
||||||
|
print(f"\n--- Update Plan (active tasks only) ---")
|
||||||
|
for p in plan:
|
||||||
|
print(f" {p['folder_name']:35s} {len(p['tasks']):3d} tasks")
|
||||||
|
print(f" {'TOTAL':35s} {total_tasks:3d} tasks")
|
||||||
|
|
||||||
|
if DRY_RUN:
|
||||||
|
print("\n** DRY RUN -- no changes made. Unset DRY_RUN to execute. **\n")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 4. Field should already be created in ClickUp UI
|
||||||
|
print(f"\nLooking for space-level '{NEW_FIELD_NAME}' field ...")
|
||||||
|
|
||||||
|
# 5. Discover the field UUID + option IDs
|
||||||
|
first_list_id = client_folders[0]["overall_id"]
|
||||||
|
print(f"\nDiscovering '{NEW_FIELD_NAME}' field UUID and option IDs ...")
|
||||||
|
field_info = client.discover_field_filter(first_list_id, NEW_FIELD_NAME)
|
||||||
|
if field_info is None:
|
||||||
|
sys.exit(
|
||||||
|
f"\nERROR: Could not find '{NEW_FIELD_NAME}' field on list {first_list_id}.\n"
|
||||||
|
f"Make sure you created it as a SPACE-level field (visible to all lists)."
|
||||||
|
)
|
||||||
|
|
||||||
|
field_id = field_info["field_id"]
|
||||||
|
option_map = field_info["options"] # {name: uuid}
|
||||||
|
print(f" Field ID: {field_id}")
|
||||||
|
print(f" Options found: {len(option_map)}")
|
||||||
|
|
||||||
|
# Verify all folder names have matching options
|
||||||
|
missing = [cf["name"] for cf in client_folders if cf["name"] not in option_map]
|
||||||
|
if missing:
|
||||||
|
print(f"\n WARNING: These folder names have no matching dropdown option:")
|
||||||
|
for name in missing:
|
||||||
|
print(f" - {name}")
|
||||||
|
print(" Tasks in these folders will be SKIPPED.")
|
||||||
|
|
||||||
|
# 6. Set Client field on each task
|
||||||
|
updated = 0
|
||||||
|
skipped = 0
|
||||||
|
failed = 0
|
||||||
|
for p in plan:
|
||||||
|
folder_name = p["folder_name"]
|
||||||
|
opt_id = option_map.get(folder_name)
|
||||||
|
if not opt_id:
|
||||||
|
skipped += len(p["tasks"])
|
||||||
|
print(f"\n SKIP: '{folder_name}' -- no matching option")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"\nUpdating {len(p['tasks'])} tasks in '{folder_name}' ...")
|
||||||
|
for task in p["tasks"]:
|
||||||
|
ok = client.set_custom_field_value(task.id, field_id, opt_id)
|
||||||
|
if ok:
|
||||||
|
updated += 1
|
||||||
|
else:
|
||||||
|
failed += 1
|
||||||
|
print(f" FAILED: task {task.id} ({task.name})")
|
||||||
|
time.sleep(0.15)
|
||||||
|
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print(f" Done! Updated: {updated} | Skipped: {skipped} | Failed: {failed}")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
print(f"\n Next steps:")
|
||||||
|
print(f" 1. Verify tasks in ClickUp have the '{NEW_FIELD_NAME}' field set correctly")
|
||||||
|
print(f" 2. Update config.yaml: change 'Customer' → '{NEW_FIELD_NAME}' in field_mapping")
|
||||||
|
print(f" 3. Test CheddahBot with the new field")
|
||||||
|
print(f" 4. Delete the old list-level 'Customer' fields from ClickUp\n")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -17,7 +17,7 @@ _PR_MAPPING = {
|
||||||
"auto_execute": True,
|
"auto_execute": True,
|
||||||
"field_mapping": {
|
"field_mapping": {
|
||||||
"topic": "task_name",
|
"topic": "task_name",
|
||||||
"company_name": "Customer",
|
"company_name": "Client",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -69,7 +69,7 @@ def _now_ms():
|
||||||
return int(datetime.now(UTC).timestamp() * 1000)
|
return int(datetime.now(UTC).timestamp() * 1000)
|
||||||
|
|
||||||
|
|
||||||
_FIELDS = {"Customer": "Acme"}
|
_FIELDS = {"Client": "Acme"}
|
||||||
|
|
||||||
|
|
||||||
# ── Tests ──
|
# ── Tests ──
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue