142 lines
3.9 KiB
Python
142 lines
3.9 KiB
Python
"""Audio/video processing: STT, TTS, video frame extraction."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# ── Speech-to-Text ──
|
|
|
|
|
|
def transcribe_audio(audio_path: str | Path) -> str:
|
|
"""Transcribe audio to text. Tries OpenAI Whisper API, falls back to local whisper."""
|
|
audio_path = Path(audio_path)
|
|
if not audio_path.exists():
|
|
return ""
|
|
|
|
# Try local whisper first (no API key needed)
|
|
try:
|
|
return _transcribe_local(audio_path)
|
|
except ImportError:
|
|
pass
|
|
except Exception as e:
|
|
log.warning("Local whisper failed: %s", e)
|
|
|
|
# Fallback: try OpenAI API
|
|
try:
|
|
return _transcribe_openai_api(audio_path)
|
|
except Exception as e:
|
|
log.warning("OpenAI whisper API failed: %s", e)
|
|
|
|
return f"(Could not transcribe audio from {audio_path.name})"
|
|
|
|
|
|
def _transcribe_local(audio_path: Path) -> str:
|
|
import whisper
|
|
|
|
model = whisper.load_model("base")
|
|
result = model.transcribe(str(audio_path))
|
|
return result.get("text", "").strip()
|
|
|
|
|
|
def _transcribe_openai_api(audio_path: Path) -> str:
|
|
import os
|
|
|
|
import openai
|
|
|
|
key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
|
|
if not key:
|
|
raise ValueError("No API key for Whisper")
|
|
client = openai.OpenAI(api_key=key)
|
|
with open(audio_path, "rb") as f:
|
|
transcript = client.audio.transcriptions.create(model="whisper-1", file=f)
|
|
return transcript.text.strip()
|
|
|
|
|
|
# ── Text-to-Speech ──
|
|
|
|
|
|
def text_to_speech(
|
|
text: str, output_path: str | Path | None = None, voice: str = "en-US-AriaNeural"
|
|
) -> Path:
|
|
"""Convert text to speech using edge-tts (free, no API key)."""
|
|
output_path = Path(tempfile.mktemp(suffix=".mp3")) if output_path is None else Path(output_path)
|
|
|
|
try:
|
|
import edge_tts
|
|
|
|
async def _generate():
|
|
communicate = edge_tts.Communicate(text, voice)
|
|
await communicate.save(str(output_path))
|
|
|
|
asyncio.run(_generate())
|
|
return output_path
|
|
except ImportError:
|
|
log.warning("edge-tts not installed. Run: pip install edge-tts")
|
|
# Write a placeholder
|
|
output_path.write_text("TTS not available", encoding="utf-8")
|
|
return output_path
|
|
|
|
|
|
# ── Video Frame Extraction ──
|
|
|
|
|
|
def extract_video_frames(video_path: str | Path, max_frames: int = 5) -> list[Path]:
|
|
"""Extract key frames from a video using ffmpeg."""
|
|
video_path = Path(video_path)
|
|
if not video_path.exists():
|
|
return []
|
|
|
|
output_dir = Path(tempfile.mkdtemp(prefix="cheddah_frames_"))
|
|
|
|
try:
|
|
# Get video duration
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"default=noprint_wrappers=1:nokey=1",
|
|
str(video_path),
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
duration = float(result.stdout.strip()) if result.stdout.strip() else 10.0
|
|
interval = max(duration / (max_frames + 1), 1.0)
|
|
|
|
# Extract frames
|
|
subprocess.run(
|
|
[
|
|
"ffmpeg",
|
|
"-i",
|
|
str(video_path),
|
|
"-vf",
|
|
f"fps=1/{interval}",
|
|
"-frames:v",
|
|
str(max_frames),
|
|
str(output_dir / "frame_%03d.jpg"),
|
|
],
|
|
capture_output=True,
|
|
timeout=30,
|
|
)
|
|
|
|
frames = sorted(output_dir.glob("frame_*.jpg"))
|
|
return frames
|
|
except FileNotFoundError:
|
|
log.warning("ffmpeg/ffprobe not found. Video analysis requires ffmpeg.")
|
|
return []
|
|
except Exception as e:
|
|
log.warning("Video frame extraction failed: %s", e)
|
|
return []
|