CheddahBot/cheddahbot/google_auth.py

66 lines
2.3 KiB
Python

"""Google OAuth2 credential management for Gmail API and Google Drive API."""
from __future__ import annotations
import logging
from pathlib import Path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
log = logging.getLogger(__name__)
# All scopes requested upfront to avoid re-consent.
SCOPES = [
"https://www.googleapis.com/auth/gmail.compose", # Create drafts
"https://www.googleapis.com/auth/gmail.readonly", # Read replies
"https://www.googleapis.com/auth/drive.file", # Manage app-created files
]
_DEFAULT_CREDENTIALS_FILE = Path("credentials/gmail_credentials.json")
_DEFAULT_TOKEN_FILE = Path("data/google_token.json")
def get_credentials(
credentials_file: Path | str = _DEFAULT_CREDENTIALS_FILE,
token_file: Path | str = _DEFAULT_TOKEN_FILE,
scopes: list[str] | None = None,
) -> Credentials:
"""Return valid Google OAuth2 credentials, refreshing or re-authenticating as needed.
On first run, opens a browser for OAuth consent and stores the refresh token.
Subsequent runs use the stored token and refresh it automatically.
"""
credentials_file = Path(credentials_file)
token_file = Path(token_file)
scopes = scopes or SCOPES
creds: Credentials | None = None
# Load existing token
if token_file.exists():
creds = Credentials.from_authorized_user_file(str(token_file), scopes)
# Refresh or re-authenticate
if creds and creds.expired and creds.refresh_token:
log.info("Refreshing expired Google token")
creds.refresh(Request())
elif not creds or not creds.valid:
if not credentials_file.exists():
raise FileNotFoundError(
"OAuth credentials file not found at %s. "
"Download it from Google Cloud Console -> APIs & Services -> Credentials."
% credentials_file
)
log.info("Starting Google OAuth consent flow (browser will open)")
flow = InstalledAppFlow.from_client_secrets_file(str(credentials_file), scopes)
creds = flow.run_local_server(port=0)
# Persist token for next run
token_file.parent.mkdir(parents=True, exist_ok=True)
with open(token_file, "w") as f:
f.write(creds.to_json())
return creds