from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import PlainTextResponse from pydantic import BaseModel, HttpUrl from typing import List, Optional from datetime import datetime import sqlite3 import json from pathlib import Path app = FastAPI(title="Link Tracker API", version="1.0.0") # Enable CORS for your Chrome extension app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, restrict this to specific origins allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Database setup DB_PATH = "link_tracker.db" def init_db(): """Initialize the database with required tables""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Pages table - stores captured page information cursor.execute(""" CREATE TABLE IF NOT EXISTS pages ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE NOT NULL, title TEXT NOT NULL, keywords TEXT, -- JSON array of keywords timestamp DATETIME NOT NULL, detected_clients TEXT, -- JSON array of detected clients total_links INTEGER NOT NULL, linked_to TEXT, -- JSON array of client URLs this page links to colinkiri BOOLEAN DEFAULT FALSE, indexer BOOLEAN DEFAULT FALSE, t2 BOOLEAN DEFAULT FALSE, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # For existing databases, add the new columns if they don't exist try: cursor.execute("ALTER TABLE pages ADD COLUMN colinkiri BOOLEAN DEFAULT FALSE") except sqlite3.OperationalError: pass # Column already exists try: cursor.execute("ALTER TABLE pages ADD COLUMN indexer BOOLEAN DEFAULT FALSE") except sqlite3.OperationalError: pass # Column already exists try: cursor.execute("ALTER TABLE pages ADD COLUMN t2 BOOLEAN DEFAULT FALSE") except sqlite3.OperationalError: pass # Column already exists try: cursor.execute("ALTER TABLE pages ADD COLUMN linked_to TEXT") except sqlite3.OperationalError: pass # Column already exists # Links table - stores all external links found on pages cursor.execute(""" CREATE TABLE IF NOT EXISTS links ( id INTEGER PRIMARY KEY AUTOINCREMENT, page_id INTEGER, href TEXT NOT NULL, anchor_text TEXT, title_attr TEXT, domain TEXT NOT NULL, is_client_link BOOLEAN DEFAULT FALSE, client_domain TEXT, client_name TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (page_id) REFERENCES pages (id) ) """) # Create indexes for better query performance cursor.execute("CREATE INDEX IF NOT EXISTS idx_pages_url ON pages (url)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_links_domain ON links (domain)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_links_client_domain ON links (client_domain)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_links_page_id ON links (page_id)") conn.commit() conn.close() # Pydantic models for API requests class LinkData(BaseModel): href: str text: Optional[str] = "" title: Optional[str] = "" class DetectedClient(BaseModel): domain: str name: str class PageCaptureRequest(BaseModel): url: str title: str timestamp: str keywords: List[str] detectedClients: List[DetectedClient] totalLinks: int links: List[LinkData] # API Response models class PageSummary(BaseModel): id: int url: str title: str timestamp: str detected_clients: List[str] total_links: int client_links_count: int class LinkSummary(BaseModel): href: str anchor_text: str domain: str is_client_link: bool client_name: Optional[str] = None @app.on_event("startup") async def startup_event(): """Initialize database on startup""" init_db() @app.get("/") async def root(): """Health check endpoint""" return {"message": "Link Tracker API is running"} @app.post("/capture-page") async def capture_page(data: PageCaptureRequest): """Capture page data and links from Chrome extension""" try: print(f"Received data: {data}") # Debug logging conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Check if page already exists cursor.execute("SELECT id FROM pages WHERE url = ?", (data.url,)) existing_page = cursor.fetchone() # Get client domains for faster lookup client_domains = {c.domain: c.name for c in data.detectedClients} # Collect client URLs for the linked_to field client_urls = [] if existing_page: # Update existing page page_id = existing_page[0] cursor.execute(""" UPDATE pages SET title = ?, keywords = ?, timestamp = ?, detected_clients = ?, total_links = ?, linked_to = ? WHERE id = ? """, ( data.title, json.dumps(data.keywords), data.timestamp, json.dumps([{"domain": c.domain, "name": c.name} for c in data.detectedClients]), data.totalLinks, json.dumps([]), # Will be populated below page_id )) # Delete existing links for this page cursor.execute("DELETE FROM links WHERE page_id = ?", (page_id,)) else: # Insert new page cursor.execute(""" INSERT INTO pages (url, title, keywords, timestamp, detected_clients, total_links, linked_to) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( data.url, data.title, json.dumps(data.keywords), data.timestamp, json.dumps([{"domain": c.domain, "name": c.name} for c in data.detectedClients]), data.totalLinks, json.dumps([]) # Will be populated below )) page_id = cursor.lastrowid # Insert links for link in data.links: try: from urllib.parse import urlparse parsed_url = urlparse(link.href) domain = parsed_url.netloc.replace('www.', '') # Check if this is a client link is_client_link = domain in client_domains client_name = client_domains.get(domain) if is_client_link else None # If it's a client link, add to linked_to array if is_client_link: client_urls.append(link.href) cursor.execute(""" INSERT INTO links (page_id, href, anchor_text, title_attr, domain, is_client_link, client_domain, client_name) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( page_id, link.href, link.text or "", link.title or "", domain, is_client_link, domain if is_client_link else None, client_name )) except Exception as e: print(f"Error processing link {link.href}: {e}") continue # Update the linked_to field with collected client URLs cursor.execute("UPDATE pages SET linked_to = ? WHERE id = ?", (json.dumps(client_urls), page_id)) conn.commit() conn.close() return { "success": True, "message": f"Captured {data.totalLinks} links from {data.url}", "page_id": page_id, "detected_clients": len(data.detectedClients) } except Exception as e: print(f"Error details: {e}") # Debug logging import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"Error capturing page data: {str(e)}") @app.get("/pages", response_model=List[PageSummary]) async def get_pages(limit: int = 50, offset: int = 0): """Get list of captured pages""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT p.id, p.url, p.title, p.timestamp, p.detected_clients, p.total_links, COUNT(l.id) as client_links_count FROM pages p LEFT JOIN links l ON p.id = l.page_id AND l.is_client_link = 1 GROUP BY p.id ORDER BY p.created_at DESC LIMIT ? OFFSET ? """, (limit, offset)) pages = [] for row in cursor.fetchall(): detected_clients_data = json.loads(row[4]) if row[4] else [] client_names = [c["name"] for c in detected_clients_data] pages.append(PageSummary( id=row[0], url=row[1], title=row[2], timestamp=row[3], detected_clients=client_names, total_links=row[5], client_links_count=row[6] )) conn.close() return pages except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching pages: {str(e)}") @app.get("/pages/{page_id}/links", response_model=List[LinkSummary]) async def get_page_links(page_id: int): """Get all links for a specific page""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT href, anchor_text, domain, is_client_link, client_name FROM links WHERE page_id = ? ORDER BY is_client_link DESC, domain ASC """, (page_id,)) links = [] for row in cursor.fetchall(): links.append(LinkSummary( href=row[0], anchor_text=row[1], domain=row[2], is_client_link=bool(row[3]), client_name=row[4] )) conn.close() return links except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching links: {str(e)}") @app.get("/clients/{client_domain}/links") async def get_client_links(client_domain: str, limit: int = 100): """Get all links pointing to a specific client domain""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT l.href, l.anchor_text, p.url as source_page, p.title as source_title, l.client_name, p.timestamp FROM links l JOIN pages p ON l.page_id = p.id WHERE l.client_domain = ? ORDER BY p.timestamp DESC LIMIT ? """, (client_domain, limit)) links = [] for row in cursor.fetchall(): links.append({ "target_url": row[0], "anchor_text": row[1], "source_page": row[2], "source_title": row[3], "client_name": row[4], "timestamp": row[5] }) conn.close() return {"client_domain": client_domain, "links": links} except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching client links: {str(e)}") @app.get("/urls/for-colinkiri", response_class=PlainTextResponse) async def get_urls_for_colinkiri(): """Get all URLs where colinkiri=false and mark them as processed""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Get unprocessed URLs cursor.execute(""" SELECT id, url FROM pages WHERE colinkiri = FALSE ORDER BY created_at ASC """) page_ids = [] url_list = [] for row in cursor.fetchall(): page_ids.append(row[0]) url_list.append(row[1]) # Mark them as processed if page_ids: placeholders = ','.join(['?'] * len(page_ids)) cursor.execute(f"UPDATE pages SET colinkiri = TRUE WHERE id IN ({placeholders})", page_ids) conn.commit() conn.close() # Return URLs as plain text, one per line return '\n'.join(url_list) except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting URLs for colinkiri: {str(e)}") @app.get("/urls/for-indexer", response_class=PlainTextResponse) async def get_urls_for_indexer(): """Get all URLs where indexer=false and mark them as processed""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Get unprocessed URLs cursor.execute(""" SELECT id, url FROM pages WHERE indexer = FALSE ORDER BY created_at ASC """) page_ids = [] url_list = [] for row in cursor.fetchall(): page_ids.append(row[0]) url_list.append(row[1]) # Mark them as processed if page_ids: placeholders = ','.join(['?'] * len(page_ids)) cursor.execute(f"UPDATE pages SET indexer = TRUE WHERE id IN ({placeholders})", page_ids) conn.commit() conn.close() # Return URLs as plain text, one per line return '\n'.join(url_list) except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting URLs for indexer: {str(e)}") @app.get("/search/linking-to") async def search_pages_linking_to(target_url: str): """Find all pages that link to a specific URL""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT id, url, title, timestamp, linked_to FROM pages WHERE linked_to LIKE ? ORDER BY created_at DESC """, (f'%{target_url}%',)) pages = [] for row in cursor.fetchall(): linked_to = json.loads(row[4]) if row[4] else [] # Verify the exact URL is in the linked_to array if target_url in linked_to: pages.append({ "id": row[0], "url": row[1], "title": row[2], "timestamp": row[3], "linked_to": linked_to }) conn.close() return { "target_url": target_url, "pages": pages, "count": len(pages) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error searching for pages linking to URL: {str(e)}") @app.get("/stats") async def get_stats(): """Get overall statistics""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Total pages captured cursor.execute("SELECT COUNT(*) FROM pages") total_pages = cursor.fetchone()[0] # Total links captured cursor.execute("SELECT COUNT(*) FROM links") total_links = cursor.fetchone()[0] # Total client links cursor.execute("SELECT COUNT(*) FROM links WHERE is_client_link = 1") client_links = cursor.fetchone()[0] # Links by client cursor.execute(""" SELECT client_name, COUNT(*) as link_count FROM links WHERE is_client_link = 1 GROUP BY client_name ORDER BY link_count DESC """) client_stats = [{"client": row[0], "links": row[1]} for row in cursor.fetchall()] conn.close() return { "total_pages": total_pages, "total_links": total_links, "client_links": client_links, "other_links": total_links - client_links, "client_breakdown": client_stats } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching stats: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)