502 lines
16 KiB
Python
502 lines
16 KiB
Python
from fastapi import FastAPI, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import PlainTextResponse
|
|
from pydantic import BaseModel, HttpUrl
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
import sqlite3
|
|
import json
|
|
from pathlib import Path
|
|
|
|
app = FastAPI(title="Link Tracker API", version="1.0.0")
|
|
|
|
# Enable CORS for your Chrome extension
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # In production, restrict this to specific origins
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Database setup
|
|
DB_PATH = "link_tracker.db"
|
|
|
|
def init_db():
|
|
"""Initialize the database with required tables"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Pages table - stores captured page information
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS pages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
url TEXT UNIQUE NOT NULL,
|
|
title TEXT NOT NULL,
|
|
keywords TEXT, -- JSON array of keywords
|
|
timestamp DATETIME NOT NULL,
|
|
detected_clients TEXT, -- JSON array of detected clients
|
|
total_links INTEGER NOT NULL,
|
|
linked_to TEXT, -- JSON array of client URLs this page links to
|
|
colinkiri BOOLEAN DEFAULT FALSE,
|
|
indexer BOOLEAN DEFAULT FALSE,
|
|
t2 BOOLEAN DEFAULT FALSE,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# For existing databases, add the new columns if they don't exist
|
|
try:
|
|
cursor.execute("ALTER TABLE pages ADD COLUMN colinkiri BOOLEAN DEFAULT FALSE")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
|
|
try:
|
|
cursor.execute("ALTER TABLE pages ADD COLUMN indexer BOOLEAN DEFAULT FALSE")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
|
|
try:
|
|
cursor.execute("ALTER TABLE pages ADD COLUMN t2 BOOLEAN DEFAULT FALSE")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
|
|
try:
|
|
cursor.execute("ALTER TABLE pages ADD COLUMN linked_to TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
|
|
# Links table - stores all external links found on pages
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS links (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
page_id INTEGER,
|
|
href TEXT NOT NULL,
|
|
anchor_text TEXT,
|
|
title_attr TEXT,
|
|
domain TEXT NOT NULL,
|
|
is_client_link BOOLEAN DEFAULT FALSE,
|
|
client_domain TEXT,
|
|
client_name TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (page_id) REFERENCES pages (id)
|
|
)
|
|
""")
|
|
|
|
# Create indexes for better query performance
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_pages_url ON pages (url)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_links_domain ON links (domain)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_links_client_domain ON links (client_domain)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_links_page_id ON links (page_id)")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Pydantic models for API requests
|
|
class LinkData(BaseModel):
|
|
href: str
|
|
text: Optional[str] = ""
|
|
title: Optional[str] = ""
|
|
|
|
class DetectedClient(BaseModel):
|
|
domain: str
|
|
name: str
|
|
|
|
class PageCaptureRequest(BaseModel):
|
|
url: str
|
|
title: str
|
|
timestamp: str
|
|
keywords: List[str]
|
|
detectedClients: List[DetectedClient]
|
|
totalLinks: int
|
|
links: List[LinkData]
|
|
|
|
# API Response models
|
|
class PageSummary(BaseModel):
|
|
id: int
|
|
url: str
|
|
title: str
|
|
timestamp: str
|
|
detected_clients: List[str]
|
|
total_links: int
|
|
client_links_count: int
|
|
|
|
class LinkSummary(BaseModel):
|
|
href: str
|
|
anchor_text: str
|
|
domain: str
|
|
is_client_link: bool
|
|
client_name: Optional[str] = None
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Initialize database on startup"""
|
|
init_db()
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
"""Health check endpoint"""
|
|
return {"message": "Link Tracker API is running"}
|
|
|
|
@app.post("/capture-page")
|
|
async def capture_page(data: PageCaptureRequest):
|
|
"""Capture page data and links from Chrome extension"""
|
|
try:
|
|
print(f"Received data: {data}") # Debug logging
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Check if page already exists
|
|
cursor.execute("SELECT id FROM pages WHERE url = ?", (data.url,))
|
|
existing_page = cursor.fetchone()
|
|
|
|
# Get client domains for faster lookup
|
|
client_domains = {c.domain: c.name for c in data.detectedClients}
|
|
|
|
# Collect client URLs for the linked_to field
|
|
client_urls = []
|
|
|
|
if existing_page:
|
|
# Update existing page
|
|
page_id = existing_page[0]
|
|
cursor.execute("""
|
|
UPDATE pages
|
|
SET title = ?, keywords = ?, timestamp = ?,
|
|
detected_clients = ?, total_links = ?, linked_to = ?
|
|
WHERE id = ?
|
|
""", (
|
|
data.title,
|
|
json.dumps(data.keywords),
|
|
data.timestamp,
|
|
json.dumps([{"domain": c.domain, "name": c.name} for c in data.detectedClients]),
|
|
data.totalLinks,
|
|
json.dumps([]), # Will be populated below
|
|
page_id
|
|
))
|
|
|
|
# Delete existing links for this page
|
|
cursor.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
|
|
else:
|
|
# Insert new page
|
|
cursor.execute("""
|
|
INSERT INTO pages (url, title, keywords, timestamp, detected_clients, total_links, linked_to)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
data.url,
|
|
data.title,
|
|
json.dumps(data.keywords),
|
|
data.timestamp,
|
|
json.dumps([{"domain": c.domain, "name": c.name} for c in data.detectedClients]),
|
|
data.totalLinks,
|
|
json.dumps([]) # Will be populated below
|
|
))
|
|
page_id = cursor.lastrowid
|
|
|
|
# Insert links
|
|
for link in data.links:
|
|
try:
|
|
from urllib.parse import urlparse
|
|
parsed_url = urlparse(link.href)
|
|
domain = parsed_url.netloc.replace('www.', '')
|
|
|
|
# Check if this is a client link
|
|
is_client_link = domain in client_domains
|
|
client_name = client_domains.get(domain) if is_client_link else None
|
|
|
|
# If it's a client link, add to linked_to array
|
|
if is_client_link:
|
|
client_urls.append(link.href)
|
|
|
|
cursor.execute("""
|
|
INSERT INTO links (page_id, href, anchor_text, title_attr, domain,
|
|
is_client_link, client_domain, client_name)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
page_id,
|
|
link.href,
|
|
link.text or "",
|
|
link.title or "",
|
|
domain,
|
|
is_client_link,
|
|
domain if is_client_link else None,
|
|
client_name
|
|
))
|
|
except Exception as e:
|
|
print(f"Error processing link {link.href}: {e}")
|
|
continue
|
|
|
|
# Update the linked_to field with collected client URLs
|
|
cursor.execute("UPDATE pages SET linked_to = ? WHERE id = ?", (json.dumps(client_urls), page_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Captured {data.totalLinks} links from {data.url}",
|
|
"page_id": page_id,
|
|
"detected_clients": len(data.detectedClients)
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error details: {e}") # Debug logging
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(status_code=500, detail=f"Error capturing page data: {str(e)}")
|
|
|
|
@app.get("/pages", response_model=List[PageSummary])
|
|
async def get_pages(limit: int = 50, offset: int = 0):
|
|
"""Get list of captured pages"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT p.id, p.url, p.title, p.timestamp, p.detected_clients, p.total_links,
|
|
COUNT(l.id) as client_links_count
|
|
FROM pages p
|
|
LEFT JOIN links l ON p.id = l.page_id AND l.is_client_link = 1
|
|
GROUP BY p.id
|
|
ORDER BY p.created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
""", (limit, offset))
|
|
|
|
pages = []
|
|
for row in cursor.fetchall():
|
|
detected_clients_data = json.loads(row[4]) if row[4] else []
|
|
client_names = [c["name"] for c in detected_clients_data]
|
|
|
|
pages.append(PageSummary(
|
|
id=row[0],
|
|
url=row[1],
|
|
title=row[2],
|
|
timestamp=row[3],
|
|
detected_clients=client_names,
|
|
total_links=row[5],
|
|
client_links_count=row[6]
|
|
))
|
|
|
|
conn.close()
|
|
return pages
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error fetching pages: {str(e)}")
|
|
|
|
@app.get("/pages/{page_id}/links", response_model=List[LinkSummary])
|
|
async def get_page_links(page_id: int):
|
|
"""Get all links for a specific page"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT href, anchor_text, domain, is_client_link, client_name
|
|
FROM links
|
|
WHERE page_id = ?
|
|
ORDER BY is_client_link DESC, domain ASC
|
|
""", (page_id,))
|
|
|
|
links = []
|
|
for row in cursor.fetchall():
|
|
links.append(LinkSummary(
|
|
href=row[0],
|
|
anchor_text=row[1],
|
|
domain=row[2],
|
|
is_client_link=bool(row[3]),
|
|
client_name=row[4]
|
|
))
|
|
|
|
conn.close()
|
|
return links
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error fetching links: {str(e)}")
|
|
|
|
@app.get("/clients/{client_domain}/links")
|
|
async def get_client_links(client_domain: str, limit: int = 100):
|
|
"""Get all links pointing to a specific client domain"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT l.href, l.anchor_text, p.url as source_page, p.title as source_title,
|
|
l.client_name, p.timestamp
|
|
FROM links l
|
|
JOIN pages p ON l.page_id = p.id
|
|
WHERE l.client_domain = ?
|
|
ORDER BY p.timestamp DESC
|
|
LIMIT ?
|
|
""", (client_domain, limit))
|
|
|
|
links = []
|
|
for row in cursor.fetchall():
|
|
links.append({
|
|
"target_url": row[0],
|
|
"anchor_text": row[1],
|
|
"source_page": row[2],
|
|
"source_title": row[3],
|
|
"client_name": row[4],
|
|
"timestamp": row[5]
|
|
})
|
|
|
|
conn.close()
|
|
return {"client_domain": client_domain, "links": links}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error fetching client links: {str(e)}")
|
|
|
|
@app.get("/urls/for-colinkiri", response_class=PlainTextResponse)
|
|
async def get_urls_for_colinkiri():
|
|
"""Get all URLs where colinkiri=false and mark them as processed"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Get unprocessed URLs
|
|
cursor.execute("""
|
|
SELECT id, url
|
|
FROM pages
|
|
WHERE colinkiri = FALSE
|
|
ORDER BY created_at ASC
|
|
""")
|
|
|
|
page_ids = []
|
|
url_list = []
|
|
for row in cursor.fetchall():
|
|
page_ids.append(row[0])
|
|
url_list.append(row[1])
|
|
|
|
# Mark them as processed
|
|
if page_ids:
|
|
placeholders = ','.join(['?'] * len(page_ids))
|
|
cursor.execute(f"UPDATE pages SET colinkiri = TRUE WHERE id IN ({placeholders})", page_ids)
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
# Return URLs as plain text, one per line
|
|
return '\n'.join(url_list)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error getting URLs for colinkiri: {str(e)}")
|
|
|
|
@app.get("/urls/for-indexer", response_class=PlainTextResponse)
|
|
async def get_urls_for_indexer():
|
|
"""Get all URLs where indexer=false and mark them as processed"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Get unprocessed URLs
|
|
cursor.execute("""
|
|
SELECT id, url
|
|
FROM pages
|
|
WHERE indexer = FALSE
|
|
ORDER BY created_at ASC
|
|
""")
|
|
|
|
page_ids = []
|
|
url_list = []
|
|
for row in cursor.fetchall():
|
|
page_ids.append(row[0])
|
|
url_list.append(row[1])
|
|
|
|
# Mark them as processed
|
|
if page_ids:
|
|
placeholders = ','.join(['?'] * len(page_ids))
|
|
cursor.execute(f"UPDATE pages SET indexer = TRUE WHERE id IN ({placeholders})", page_ids)
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
# Return URLs as plain text, one per line
|
|
return '\n'.join(url_list)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error getting URLs for indexer: {str(e)}")
|
|
|
|
@app.get("/search/linking-to")
|
|
async def search_pages_linking_to(target_url: str):
|
|
"""Find all pages that link to a specific URL"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, url, title, timestamp, linked_to
|
|
FROM pages
|
|
WHERE linked_to LIKE ?
|
|
ORDER BY created_at DESC
|
|
""", (f'%{target_url}%',))
|
|
|
|
pages = []
|
|
for row in cursor.fetchall():
|
|
linked_to = json.loads(row[4]) if row[4] else []
|
|
# Verify the exact URL is in the linked_to array
|
|
if target_url in linked_to:
|
|
pages.append({
|
|
"id": row[0],
|
|
"url": row[1],
|
|
"title": row[2],
|
|
"timestamp": row[3],
|
|
"linked_to": linked_to
|
|
})
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"target_url": target_url,
|
|
"pages": pages,
|
|
"count": len(pages)
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error searching for pages linking to URL: {str(e)}")
|
|
|
|
@app.get("/stats")
|
|
async def get_stats():
|
|
"""Get overall statistics"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Total pages captured
|
|
cursor.execute("SELECT COUNT(*) FROM pages")
|
|
total_pages = cursor.fetchone()[0]
|
|
|
|
# Total links captured
|
|
cursor.execute("SELECT COUNT(*) FROM links")
|
|
total_links = cursor.fetchone()[0]
|
|
|
|
# Total client links
|
|
cursor.execute("SELECT COUNT(*) FROM links WHERE is_client_link = 1")
|
|
client_links = cursor.fetchone()[0]
|
|
|
|
# Links by client
|
|
cursor.execute("""
|
|
SELECT client_name, COUNT(*) as link_count
|
|
FROM links
|
|
WHERE is_client_link = 1
|
|
GROUP BY client_name
|
|
ORDER BY link_count DESC
|
|
""")
|
|
client_stats = [{"client": row[0], "links": row[1]} for row in cursor.fetchall()]
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"total_pages": total_pages,
|
|
"total_links": total_links,
|
|
"client_links": client_links,
|
|
"other_links": total_links - client_links,
|
|
"client_breakdown": client_stats
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error fetching stats: {str(e)}")
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |