296 lines
9.2 KiB
Python
296 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Analyze screen recordings using OpenRouter + Gemini Vision.
|
|
|
|
Extracts key frames from videos and sends them to Gemini via OpenRouter,
|
|
prompting for a UX research-style analysis. Saves results as markdown
|
|
in docs/research/.
|
|
|
|
Usage:
|
|
uv run python -m app.analyze_videos # analyze all .mp4 in videos/
|
|
uv run python -m app.analyze_videos videos/file.mp4 # single video
|
|
NUM_FRAMES=8 uv run python -m app.analyze_videos # custom frame count
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
load_dotenv() # loads .env in repo root or parent directories
|
|
|
|
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
|
|
if not OPENROUTER_API_KEY:
|
|
print(
|
|
"ERROR: OpenRouter API key not found.\n"
|
|
" Put OPENROUTER_API_KEY=sk-... in .env (repo root) or set the env var.\n"
|
|
" Get one at https://openrouter.ai/keys",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
OPENROUTER_BASE = "https://openrouter.ai/api/v1"
|
|
# Gemini models available on OpenRouter:
|
|
# google/gemini-2.0-flash-exp:free (free, good for testing)
|
|
# google/gemini-2.0-flash (fast, multimodal)
|
|
# google/gemini-2.5-flash-preview-04-17 (latest preview)
|
|
DEFAULT_MODEL = os.getenv("OPENROUTER_MODEL", "google/gemini-2.5-flash-preview-05-20")
|
|
|
|
UX_PROMPT = """\
|
|
Analyze this screen recording like a UX researcher.
|
|
|
|
Track:
|
|
- user goals
|
|
- hesitation
|
|
- repeated actions
|
|
- likely confusion
|
|
- unnecessary clicks
|
|
- context switching
|
|
- inefficient workflow patterns
|
|
- UI discoverability issues
|
|
- moments where expectations appear violated
|
|
|
|
Output:
|
|
1. overall workflow summary
|
|
2. friction timeline
|
|
3. inferred user intent
|
|
4. UX issues ranked by severity
|
|
5. suggested improvements
|
|
|
|
Be specific about UI elements, button labels, menu paths, and exact behaviors
|
|
you observe in the frames provided.
|
|
"""
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Frame extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract_frames(video_path: Path, num_frames: int = 6) -> list[dict]:
|
|
"""Extract evenly-spaced key frames from a video using ffmpeg."""
|
|
if not video_path.exists():
|
|
print(f"SKIP — file not found: {video_path}", file=sys.stderr)
|
|
return []
|
|
|
|
tmp_dir = Path(".tmp_video_frames")
|
|
tmp_dir.mkdir(exist_ok=True)
|
|
|
|
# Estimate duration
|
|
try:
|
|
dur_output = subprocess.check_output(
|
|
[
|
|
"ffprobe",
|
|
"-v", "error",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
str(video_path),
|
|
],
|
|
stderr=subprocess.DEVNULL,
|
|
).decode().strip()
|
|
duration = float(dur_output)
|
|
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
|
|
print(f"SKIP — could not probe video: {video_path}", file=sys.stderr)
|
|
return []
|
|
|
|
if duration <= 0:
|
|
print(f"SKIP — bad duration for: {video_path}", file=sys.stderr)
|
|
return []
|
|
|
|
# Pick evenly spaced timestamps (skip first/last 2% to avoid black frames)
|
|
margin = max(duration * 0.02, 1.0)
|
|
times = [
|
|
str(margin + i * (duration - 2 * margin) / (num_frames - 1))
|
|
for i in range(num_frames)
|
|
]
|
|
|
|
images = []
|
|
for i, ts in enumerate(times):
|
|
out_path = tmp_dir / f"{video_path.stem}_frame_{i:03d}.jpg"
|
|
try:
|
|
subprocess.run(
|
|
[
|
|
"ffmpeg",
|
|
"-y",
|
|
"-ss", ts,
|
|
"-i", str(video_path),
|
|
"-vframes:v", "1",
|
|
"-q:v", "2", # good quality JPEG
|
|
"-an",
|
|
str(out_path),
|
|
],
|
|
capture_output=True,
|
|
check=True,
|
|
)
|
|
if out_path.exists():
|
|
images.append({"path": out_path})
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
continue
|
|
|
|
return images
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OpenRouter / Gemini API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_payload(images: list[dict]) -> dict:
|
|
"""Build the OpenRouter chat completion payload with image content."""
|
|
content = [{"type": "text", "text": UX_PROMPT}]
|
|
for img in images:
|
|
with open(img["path"], "rb") as f:
|
|
encoded = base64.b64encode(f.read()).decode()
|
|
ext = Path(img["path"]).suffix.lstrip(".")
|
|
content.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/{ext};base64,{encoded}",
|
|
},
|
|
})
|
|
|
|
return {
|
|
"model": DEFAULT_MODEL,
|
|
"messages": [{"role": "user", "content": content}],
|
|
"max_tokens": 8192,
|
|
"temperature": 0.3,
|
|
}
|
|
|
|
|
|
def call_openrouter(payload: dict) -> str:
|
|
"""Send request to OpenRouter and return the assistant's reply."""
|
|
headers = {
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
# Optional: pass-through headers for attribution / tracking
|
|
"HTTP-Referer": "https://github.com/notid/e-filing",
|
|
"X-Title": "eFiling Video Analyzer",
|
|
}
|
|
|
|
with httpx.Client(timeout=120.0) as client:
|
|
resp = client.post(
|
|
f"{OPENROUTER_BASE}/chat/completions",
|
|
headers=headers,
|
|
json=payload,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
# Extract text from the response
|
|
choices = data.get("choices", [])
|
|
if not choices:
|
|
raise ValueError(f"No choices in OpenRouter response: {json.dumps(data, indent=2)[:500]}")
|
|
return choices[0]["message"]["content"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def write_report(video_path: Path, analysis: str, model: str, num_frames: int) -> Path:
|
|
"""Write the analysis as a markdown file in docs/research/."""
|
|
output_dir = Path(__file__).resolve().parent.parent / "docs" / "research"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Sanitize filename
|
|
safe_name = re.sub(r"[^\w\s\-]", "", video_path.stem)
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d")
|
|
out_file = output_dir / f"{safe_name}_{timestamp}.md"
|
|
|
|
header = f"""\
|
|
# eFiling — UX Analysis: {video_path.name}
|
|
|
|
| Field | Value |
|
|
|-------|-------|
|
|
| **Source video** | `{video_path.name}` |
|
|
| **Analysis date** | {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")} |
|
|
| **Model** | {model} |
|
|
| **Frames analyzed** | {num_frames} |
|
|
|
|
---
|
|
|
|
"""
|
|
out_file.write_text(header + analysis)
|
|
return out_file
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Analyze screen recordings with Gemini via OpenRouter")
|
|
parser.add_argument(
|
|
"videos",
|
|
nargs="*",
|
|
default=[],
|
|
help="Video files to analyze (defaults to all .mp4 in videos/)",
|
|
)
|
|
parser.add_argument(
|
|
"--num-frames",
|
|
type=int,
|
|
default=int(os.getenv("NUM_FRAMES", "6")),
|
|
help="Number of frames to extract per video (default: 6)",
|
|
)
|
|
parser.add_argument(
|
|
"--model",
|
|
type=str,
|
|
default=os.getenv("OPENROUTER_MODEL", DEFAULT_MODEL),
|
|
help=f"OpenRouter model (default: {DEFAULT_MODEL})",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
global DEFAULT_MODEL
|
|
DEFAULT_MODEL = args.model
|
|
|
|
# Resolve video paths
|
|
videos_dir = Path(__file__).resolve().parent.parent / "videos"
|
|
if args.videos:
|
|
video_paths = [Path(v) for v in args.videos]
|
|
elif videos_dir.exists():
|
|
video_paths = sorted(videos_dir.glob("*.mp4"))
|
|
else:
|
|
print("No videos found. Pass paths explicitly or put .mp4 files in videos/", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not video_paths:
|
|
print("No .mp4 files to analyze.", file=sys.stderr)
|
|
sys.exit(0)
|
|
|
|
print(f"Analyzing {len(video_paths)} video(s) with model '{args.model}'...")
|
|
print()
|
|
|
|
for i, vp in enumerate(video_paths, 1):
|
|
print(f"[{i}/{len(video_paths)}] {vp.name}")
|
|
|
|
frames = extract_frames(vp, args.num_frames)
|
|
if not frames:
|
|
continue
|
|
|
|
print(f" Extracted {len(frames)} frame(s)")
|
|
|
|
try:
|
|
payload = build_payload(frames)
|
|
analysis = call_openrouter(payload)
|
|
out_file = write_report(vp, analysis, args.model, len(frames))
|
|
print(f" ✅ Saved to {out_file}")
|
|
except Exception as exc:
|
|
print(f" ❌ Error: {exc}", file=sys.stderr)
|
|
continue
|
|
|
|
print()
|
|
|
|
print("Done.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|