134 lines
4.1 KiB
Python
134 lines
4.1 KiB
Python
"""Client contact directory parser for skills/companies.md."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_COMPANIES_FILE = Path(__file__).resolve().parent.parent / "skills" / "companies.md"
|
|
|
|
|
|
@dataclass
|
|
class CompanyContact:
|
|
"""Structured contact record for a client company."""
|
|
|
|
name: str
|
|
aliases: list[str] = field(default_factory=list)
|
|
executive: str = ""
|
|
email: str = ""
|
|
opening: str = ""
|
|
cc: str = ""
|
|
pa_org_id: int = 0
|
|
website: str = ""
|
|
gbp: str = ""
|
|
|
|
@property
|
|
def executive_first_name(self) -> str:
|
|
"""Extract first name from executive field (e.g. 'Gary Hermsen, CEO' -> 'Gary')."""
|
|
if not self.executive:
|
|
return ""
|
|
return self.executive.split(",")[0].split()[0].strip()
|
|
|
|
@property
|
|
def cc_list(self) -> list[str]:
|
|
"""Parse CC field into a list of email addresses."""
|
|
if not self.cc:
|
|
return []
|
|
return [addr.strip() for addr in self.cc.split(",") if addr.strip()]
|
|
|
|
|
|
def parse_company_directory(path: Path | str = _COMPANIES_FILE) -> dict[str, CompanyContact]:
|
|
"""Parse companies.md and return contacts keyed by lowercase company name.
|
|
|
|
Follows the same parsing pattern as press_release.py:_parse_company_data().
|
|
"""
|
|
path = Path(path)
|
|
if not path.exists():
|
|
log.warning("Company directory not found: %s", path)
|
|
return {}
|
|
|
|
text = path.read_text(encoding="utf-8")
|
|
contacts: dict[str, CompanyContact] = {}
|
|
current: CompanyContact | None = None
|
|
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if line.startswith("## "):
|
|
if current:
|
|
contacts[current.name.lower()] = current
|
|
current = CompanyContact(name=line[3:].strip())
|
|
elif current:
|
|
_parse_field(current, line)
|
|
|
|
# Don't forget the last company
|
|
if current:
|
|
contacts[current.name.lower()] = current
|
|
|
|
log.info("Loaded %d company contacts from %s", len(contacts), path.name)
|
|
return contacts
|
|
|
|
|
|
def _parse_field(contact: CompanyContact, line: str) -> None:
|
|
"""Parse a single field line into the contact record."""
|
|
field_map = {
|
|
"- **Executive:**": "executive",
|
|
"- **Email:**": "email",
|
|
"- **Opening:**": "opening",
|
|
"- **CC:**": "cc",
|
|
"- **PA Org ID:**": "pa_org_id",
|
|
"- **Website:**": "website",
|
|
"- **GBP:**": "gbp",
|
|
}
|
|
# Handle Aliases specially (comma-separated list)
|
|
if line.startswith("- **Aliases:**"):
|
|
value = line[len("- **Aliases:**"):].strip()
|
|
contact.aliases = [a.strip() for a in value.split(",") if a.strip()]
|
|
return
|
|
|
|
for prefix, attr in field_map.items():
|
|
if line.startswith(prefix):
|
|
value = line[len(prefix):].strip()
|
|
if attr == "pa_org_id":
|
|
try:
|
|
setattr(contact, attr, int(value))
|
|
except (ValueError, IndexError):
|
|
pass
|
|
else:
|
|
setattr(contact, attr, value)
|
|
return
|
|
|
|
|
|
def lookup_contact(company_name: str, contacts: dict[str, CompanyContact] | None = None) -> CompanyContact | None:
|
|
"""Find a company contact by name with fuzzy matching.
|
|
|
|
Tries exact match on name and aliases, then substring containment.
|
|
"""
|
|
if contacts is None:
|
|
contacts = parse_company_directory()
|
|
|
|
name_lower = company_name.lower().strip()
|
|
|
|
# Exact match on canonical name
|
|
if name_lower in contacts:
|
|
return contacts[name_lower]
|
|
|
|
# Exact match on aliases
|
|
for contact in contacts.values():
|
|
for alias in contact.aliases:
|
|
if alias.lower() == name_lower:
|
|
return contact
|
|
|
|
# Substring match on name and aliases
|
|
for key, contact in contacts.items():
|
|
if key in name_lower or name_lower in key:
|
|
return contact
|
|
for alias in contact.aliases:
|
|
alias_lower = alias.lower()
|
|
if alias_lower in name_lower or name_lower in alias_lower:
|
|
return contact
|
|
|
|
return None
|