Big-Link-Man/src/database/repositories.py

654 lines
21 KiB
Python

"""
Concrete repository implementations
"""
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from src.core.config import get_config
from src.database.interfaces import IUserRepository, ISiteDeploymentRepository, IProjectRepository, IArticleLinkRepository, ISitePageRepository
from src.database.models import User, SiteDeployment, Project, GeneratedContent, ArticleLink, SitePage
class UserRepository(IUserRepository):
"""Repository implementation for User data access"""
def __init__(self, session: Session):
self.session = session
def create(self, username: str, hashed_password: str, role: str) -> User:
"""
Create a new user
Args:
username: The username for the new user
hashed_password: The hashed password
role: The role ("Admin" or "User")
Returns:
The created User object
Raises:
ValueError: If username already exists
"""
if role not in ["Admin", "User"]:
raise ValueError(f"Invalid role: {role}. Must be 'Admin' or 'User'")
user = User(
username=username,
hashed_password=hashed_password,
role=role
)
try:
self.session.add(user)
self.session.commit()
self.session.refresh(user)
return user
except IntegrityError:
self.session.rollback()
raise ValueError(f"User with username '{username}' already exists")
def get_by_id(self, user_id: int) -> Optional[User]:
"""
Get a user by ID
Args:
user_id: The user ID to search for
Returns:
User object if found, None otherwise
"""
return self.session.query(User).filter(User.id == user_id).first()
def get_by_username(self, username: str) -> Optional[User]:
"""
Get a user by username
Args:
username: The username to search for
Returns:
User object if found, None otherwise
"""
return self.session.query(User).filter(User.username == username).first()
def get_all(self) -> List[User]:
"""
Get all users
Returns:
List of all User objects
"""
return self.session.query(User).all()
def update(self, user: User) -> User:
"""
Update an existing user
Args:
user: The User object with updated data
Returns:
The updated User object
"""
self.session.add(user)
self.session.commit()
self.session.refresh(user)
return user
def delete(self, user_id: int) -> bool:
"""
Delete a user by ID
Args:
user_id: The ID of the user to delete
Returns:
True if deleted, False if user not found
"""
user = self.get_by_id(user_id)
if user:
self.session.delete(user)
self.session.commit()
return True
return False
def exists(self, username: str) -> bool:
"""
Check if a user exists by username
Args:
username: The username to check
Returns:
True if user exists, False otherwise
"""
return self.session.query(User).filter(User.username == username).first() is not None
class SiteDeploymentRepository(ISiteDeploymentRepository):
"""Repository implementation for SiteDeployment data access"""
def __init__(self, session: Session):
self.session = session
def create(
self,
site_name: str,
storage_zone_id: int,
storage_zone_name: str,
storage_zone_password: str,
storage_zone_region: str,
pull_zone_id: int,
pull_zone_bcdn_hostname: str,
custom_hostname: Optional[str] = None
) -> SiteDeployment:
"""
Create a new site deployment
Args:
site_name: User-friendly name for the site
storage_zone_id: bunny.net Storage Zone ID
storage_zone_name: Storage Zone name
storage_zone_password: Storage Zone API password
storage_zone_region: Storage region code (e.g., "DE", "NY", "LA")
pull_zone_id: bunny.net Pull Zone ID
pull_zone_bcdn_hostname: Default b-cdn.net hostname
custom_hostname: Optional custom FQDN (e.g., www.yourdomain.com)
Returns:
The created SiteDeployment object
Raises:
ValueError: If hostname already exists
"""
deployment = SiteDeployment(
site_name=site_name,
custom_hostname=custom_hostname,
storage_zone_id=storage_zone_id,
storage_zone_name=storage_zone_name,
storage_zone_password=storage_zone_password,
storage_zone_region=storage_zone_region,
pull_zone_id=pull_zone_id,
pull_zone_bcdn_hostname=pull_zone_bcdn_hostname
)
try:
self.session.add(deployment)
self.session.commit()
self.session.refresh(deployment)
return deployment
except IntegrityError:
self.session.rollback()
hostname = custom_hostname or pull_zone_bcdn_hostname
raise ValueError(f"Site deployment with hostname '{hostname}' already exists")
def get_by_id(self, deployment_id: int) -> Optional[SiteDeployment]:
"""
Get a site deployment by ID
Args:
deployment_id: The deployment ID to search for
Returns:
SiteDeployment object if found, None otherwise
"""
return self.session.query(SiteDeployment).filter(SiteDeployment.id == deployment_id).first()
def get_by_hostname(self, custom_hostname: str) -> Optional[SiteDeployment]:
"""
Get a site deployment by custom hostname
Args:
custom_hostname: The hostname to search for
Returns:
SiteDeployment object if found, None otherwise
"""
return self.session.query(SiteDeployment).filter(SiteDeployment.custom_hostname == custom_hostname).first()
def get_by_bcdn_hostname(self, bcdn_hostname: str) -> Optional[SiteDeployment]:
"""
Get a site deployment by bunny.net CDN hostname
Args:
bcdn_hostname: The b-cdn.net hostname to search for
Returns:
SiteDeployment object if found, None otherwise
"""
return self.session.query(SiteDeployment).filter(SiteDeployment.pull_zone_bcdn_hostname == bcdn_hostname).first()
def get_all(self) -> List[SiteDeployment]:
"""
Get all site deployments
Returns:
List of all SiteDeployment objects
"""
return self.session.query(SiteDeployment).all()
def delete(self, deployment_id: int) -> bool:
"""
Delete a site deployment by ID
Args:
deployment_id: The ID of the deployment to delete
Returns:
True if deleted, False if deployment not found
"""
deployment = self.get_by_id(deployment_id)
if deployment:
self.session.delete(deployment)
self.session.commit()
return True
return False
def exists(self, hostname: str) -> bool:
"""
Check if a site deployment exists by either custom or bcdn hostname
Args:
hostname: The hostname to check (custom or bcdn)
Returns:
True if deployment exists, False otherwise
"""
return self.session.query(SiteDeployment).filter(
(SiteDeployment.custom_hostname == hostname) |
(SiteDeployment.pull_zone_bcdn_hostname == hostname)
).first() is not None
class ProjectRepository(IProjectRepository):
"""Repository implementation for Project data access"""
def __init__(self, session: Session):
self.session = session
def create(self, user_id: int, name: str, data: Dict[str, Any]) -> Project:
"""
Create a new project
Args:
user_id: The ID of the user who owns this project
name: User-friendly project name
data: Dictionary with all CORA data fields
Returns:
The created Project object
Raises:
ValueError: If user_id doesn't exist
"""
config = get_config()
project = Project(
user_id=user_id,
name=name,
main_keyword=data.get("main_keyword"),
word_count=data.get("word_count", 1250),
term_frequency=data.get("term_frequency") or config.content_rules.universal.default_term_frequency,
related_search_density=data.get("related_search_density"),
entity_density=data.get("entity_density"),
lsi_density=data.get("lsi_density"),
spintax_related_search_terms=data.get("spintax_related_search_terms"),
title_exact_match=data.get("title_exact_match"),
title_related_search=data.get("title_related_search"),
meta_exact_match=data.get("meta_exact_match"),
meta_related_search=data.get("meta_related_search"),
meta_entities=data.get("meta_entities"),
h1_exact=data.get("h1_exact"),
h1_related_search=data.get("h1_related_search"),
h1_entities=data.get("h1_entities"),
h1_lsi=data.get("h1_lsi"),
h2_total=data.get("h2_total"),
h2_exact=data.get("h2_exact"),
h2_related_search=data.get("h2_related_search"),
h2_entities=data.get("h2_entities"),
h2_lsi=data.get("h2_lsi"),
h3_total=data.get("h3_total"),
h3_exact=data.get("h3_exact"),
h3_related_search=data.get("h3_related_search"),
h3_entities=data.get("h3_entities"),
h3_lsi=data.get("h3_lsi"),
entities=data.get("entities", []),
related_searches=data.get("related_searches", []),
custom_anchor_text=data.get("custom_anchor_text", []),
money_site_url=data.get("money_site_url"),
)
try:
self.session.add(project)
self.session.commit()
self.session.refresh(project)
return project
except IntegrityError as e:
self.session.rollback()
raise ValueError(f"Failed to create project: {e}")
def get_by_id(self, project_id: int) -> Optional[Project]:
"""
Get a project by ID
Args:
project_id: The project ID to search for
Returns:
Project object if found, None otherwise
"""
return self.session.query(Project).filter(Project.id == project_id).first()
def get_by_user_id(self, user_id: int) -> List[Project]:
"""
Get all projects for a user
Args:
user_id: The user ID to search for
Returns:
List of Project objects for the user
"""
return self.session.query(Project).filter(Project.user_id == user_id).all()
def get_all(self) -> List[Project]:
"""
Get all projects
Returns:
List of all Project objects
"""
return self.session.query(Project).all()
def update(self, project: Project) -> Project:
"""
Update an existing project
Args:
project: The Project object with updated data
Returns:
The updated Project object
"""
self.session.add(project)
self.session.commit()
self.session.refresh(project)
return project
def delete(self, project_id: int) -> bool:
"""
Delete a project by ID
Args:
project_id: The ID of the project to delete
Returns:
True if deleted, False if project not found
"""
project = self.get_by_id(project_id)
if project:
self.session.delete(project)
self.session.commit()
return True
return False
class GeneratedContentRepository:
"""Repository for GeneratedContent data access"""
def __init__(self, session: Session):
self.session = session
def create(
self,
project_id: int,
tier: str,
keyword: str,
title: str,
outline: dict,
content: str,
word_count: int,
status: str,
site_deployment_id: Optional[int] = None
) -> GeneratedContent:
"""
Create a new generated content record
Args:
project_id: The project ID this content belongs to
tier: Content tier (tier1, tier2, tier3)
keyword: The keyword used for generation
title: Generated title
outline: Generated outline (JSON)
content: Generated HTML content
word_count: Final word count
status: Status (generated, augmented, failed)
site_deployment_id: Optional site deployment ID for template assignment
Returns:
The created GeneratedContent object
"""
content_record = GeneratedContent(
project_id=project_id,
tier=tier,
keyword=keyword,
title=title,
outline=outline,
content=content,
word_count=word_count,
status=status,
site_deployment_id=site_deployment_id
)
self.session.add(content_record)
self.session.commit()
self.session.refresh(content_record)
return content_record
def get_by_id(self, content_id: int) -> Optional[GeneratedContent]:
"""Get content by ID"""
return self.session.query(GeneratedContent).filter(GeneratedContent.id == content_id).first()
def get_by_project_id(self, project_id: int) -> List[GeneratedContent]:
"""Get all content for a project"""
return self.session.query(GeneratedContent).filter(GeneratedContent.project_id == project_id).all()
def get_by_project_and_tier(self, project_id: int, tier: str, require_site: bool = True) -> List[GeneratedContent]:
"""
Get content for a project and tier
Args:
project_id: Project ID to filter by
tier: Tier name to filter by
require_site: If True, only return articles with site_deployment_id set
Returns:
List of GeneratedContent records matching criteria
"""
query = self.session.query(GeneratedContent).filter(
GeneratedContent.project_id == project_id,
GeneratedContent.tier == tier
)
if require_site:
query = query.filter(GeneratedContent.site_deployment_id.isnot(None))
return query.all()
def get_by_keyword(self, keyword: str) -> List[GeneratedContent]:
"""Get content by keyword"""
return self.session.query(GeneratedContent).filter(GeneratedContent.keyword == keyword).all()
def get_by_status(self, status: str) -> List[GeneratedContent]:
"""Get content by status"""
return self.session.query(GeneratedContent).filter(GeneratedContent.status == status).all()
def update(self, content: GeneratedContent) -> GeneratedContent:
"""Update existing content"""
self.session.add(content)
self.session.commit()
self.session.refresh(content)
return content
def delete(self, content_id: int) -> bool:
"""Delete content by ID"""
content = self.get_by_id(content_id)
if content:
self.session.delete(content)
self.session.commit()
return True
return False
class ArticleLinkRepository(IArticleLinkRepository):
"""Repository for ArticleLink data access"""
def __init__(self, session: Session):
self.session = session
def create(
self,
from_content_id: int,
to_content_id: Optional[int] = None,
to_url: Optional[str] = None,
link_type: str = "tiered"
) -> ArticleLink:
"""
Create a new article link
Args:
from_content_id: Source article ID
to_content_id: Target article ID (for internal links)
to_url: Target URL (for external links like money site)
link_type: Type of link (tiered, wheel_next, wheel_prev, homepage)
Returns:
The created ArticleLink object
Raises:
ValueError: If neither to_content_id nor to_url is provided
"""
if to_content_id is None and to_url is None:
raise ValueError("Either to_content_id or to_url must be provided")
link = ArticleLink(
from_content_id=from_content_id,
to_content_id=to_content_id,
to_url=to_url,
link_type=link_type
)
try:
self.session.add(link)
self.session.commit()
self.session.refresh(link)
return link
except IntegrityError as e:
self.session.rollback()
raise ValueError(f"Failed to create article link: {e}")
def get_by_source_article(self, from_content_id: int) -> List[ArticleLink]:
"""Get all outbound links from an article"""
return self.session.query(ArticleLink).filter(
ArticleLink.from_content_id == from_content_id
).all()
def get_by_target_article(self, to_content_id: int) -> List[ArticleLink]:
"""Get all inbound links to an article"""
return self.session.query(ArticleLink).filter(
ArticleLink.to_content_id == to_content_id
).all()
def get_by_link_type(self, link_type: str) -> List[ArticleLink]:
"""Get all links of a specific type"""
return self.session.query(ArticleLink).filter(
ArticleLink.link_type == link_type
).all()
def delete(self, link_id: int) -> bool:
"""Delete an article link by ID"""
link = self.session.query(ArticleLink).filter(ArticleLink.id == link_id).first()
if link:
self.session.delete(link)
self.session.commit()
return True
return False
class SitePageRepository(ISitePageRepository):
"""Repository for SitePage data access"""
def __init__(self, session: Session):
self.session = session
def create(self, site_deployment_id: int, page_type: str, content: str) -> SitePage:
"""
Create a new site page
Args:
site_deployment_id: Site deployment ID
page_type: Type of page (about, contact, privacy)
content: Full HTML content for the page
Returns:
The created SitePage object
Raises:
ValueError: If page already exists for this site and type
"""
page = SitePage(
site_deployment_id=site_deployment_id,
page_type=page_type,
content=content
)
try:
self.session.add(page)
self.session.commit()
self.session.refresh(page)
return page
except IntegrityError:
self.session.rollback()
raise ValueError(f"Page '{page_type}' already exists for site {site_deployment_id}")
def get_by_site(self, site_deployment_id: int) -> List[SitePage]:
"""Get all pages for a site"""
return self.session.query(SitePage).filter(
SitePage.site_deployment_id == site_deployment_id
).all()
def get_by_site_and_type(self, site_deployment_id: int, page_type: str) -> Optional[SitePage]:
"""Get a specific page for a site"""
return self.session.query(SitePage).filter(
SitePage.site_deployment_id == site_deployment_id,
SitePage.page_type == page_type
).first()
def update_content(self, page_id: int, content: str) -> SitePage:
"""Update page content"""
page = self.session.query(SitePage).filter(SitePage.id == page_id).first()
if not page:
raise ValueError(f"Page with ID {page_id} not found")
page.content = content
self.session.commit()
self.session.refresh(page)
return page
def exists(self, site_deployment_id: int, page_type: str) -> bool:
"""Check if a page exists for a site"""
return self.session.query(SitePage).filter(
SitePage.site_deployment_id == site_deployment_id,
SitePage.page_type == page_type
).first() is not None
def delete(self, page_id: int) -> bool:
"""Delete a site page by ID"""
page = self.session.query(SitePage).filter(SitePage.id == page_id).first()
if page:
self.session.delete(page)
self.session.commit()
return True
return False