#!/usr/bin/env python3 """Analyze screen recordings using OpenRouter + Gemini Vision. Extracts key frames from videos and sends them to Gemini via OpenRouter, prompting for a UX research-style analysis. Saves results as markdown in docs/research/. Usage: # Analyze all videos in videos/ uv run python -m app.analyze_videos # Analyze one specific video uv run python -m app.analyze_videos "videos/E-Filing in Filevine.mp4" # Extract a frame every 30 seconds (recommended for 3-4 min videos) INTERVAL=15 uv run python -m app.analyze_videos # Force exactly N frames, evenly spaced NUM_FRAMES=8 uv run python -m app.analyze_videos """ import argparse import base64 import json import os import re import subprocess import sys from datetime import datetime, timezone from pathlib import Path import httpx from dotenv import load_dotenv # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- load_dotenv() # loads .env in repo root or parent directories OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") if not OPENROUTER_API_KEY: print( "ERROR: OpenRouter API key not found.\n" " Put OPENROUTER_API_KEY=sk-... in .env (repo root) or set the env var.\n" " Get one at https://openrouter.ai/keys", file=sys.stderr, ) sys.exit(1) OPENROUTER_BASE = "https://openrouter.ai/api/v1" # Gemini models available on OpenRouter: # google/gemini-2.0-flash-exp:free (free, good for testing) # google/gemini-2.0-flash (fast, multimodal) # google/gemini-2.5-flash-preview-05-20 (latest preview) DEFAULT_MODEL = os.getenv("OPENROUTER_MODEL", "google/gemini-2.5-flash-preview-05-20") DEFAULT_INTERVAL = int(os.getenv("INTERVAL", "30")) # seconds between frames UX_PROMPT = """\ Analyze this screen recording like a UX researcher. Track: - user goals - hesitation - repeated actions - likely confusion - unnecessary clicks - context switching - inefficient workflow patterns - UI discoverability issues - moments where expectations appear violated Output: 1. overall workflow summary 2. friction timeline 3. inferred user intent 4. UX issues ranked by severity 5. suggested improvements Be specific about UI elements, button labels, menu paths, and exact behaviors you observe in the frames provided. """ # --------------------------------------------------------------------------- # Frame extraction # --------------------------------------------------------------------------- def pick_timestamps(duration: float, interval_sec: int = 30, num_frames: int = 0) -> list[float]: """Pick timestamps to extract frames from a video. Two strategies: - interval : one frame every N seconds (default). Good for longer videos. - num_frames: evenly spread exactly N frames across the whole video. Always skips the first and last 2% to avoid black intro/outro frames. """ margin = max(duration * 0.02, 1.0) usable = duration - 2 * margin if num_frames > 0: return [round(margin + i * usable / (num_frames - 1), 2) for i in range(num_frames)] else: timestamps: list[float] = [] t = margin while t <= (duration - margin): timestamps.append(round(t, 2)) t += interval_sec if not timestamps: timestamps.append(margin) return timestamps def extract_frames(video_path: Path, timestamps: list[float]) -> list[dict]: """Extract frames from a video at the given timestamps using ffmpeg.""" if not video_path.exists(): print(f"SKIP — file not found: {video_path}", file=sys.stderr) return [] tmp_dir = Path(".tmp_video_frames") tmp_dir.mkdir(exist_ok=True) images = [] for i, ts in enumerate(timestamps): out_path = tmp_dir / f"{video_path.stem}_frame_{i:03d}.jpg" try: subprocess.run( [ "ffmpeg", "-y", "-ss", str(ts), "-i", str(video_path), "-vframes:v", "1", "-q:v", "2", # good quality JPEG "-an", str(out_path), ], capture_output=True, check=True, ) if out_path.exists(): images.append({"path": out_path}) except (subprocess.CalledProcessError, FileNotFoundError): continue return images # --------------------------------------------------------------------------- # OpenRouter / Gemini API # --------------------------------------------------------------------------- def build_payload(images: list[dict]) -> dict: """Build the OpenRouter chat completion payload with image content.""" content = [{"type": "text", "text": UX_PROMPT}] for img in images: with open(img["path"], "rb") as f: encoded = base64.b64encode(f.read()).decode() ext = Path(img["path"]).suffix.lstrip(".") content.append({ "type": "image_url", "image_url": { "url": f"data:image/{ext};base64,{encoded}", }, }) return { "model": DEFAULT_MODEL, "messages": [{"role": "user", "content": content}], "max_tokens": 8192, "temperature": 0.3, } def call_openrouter(payload: dict) -> str: """Send request to OpenRouter and return the assistant's reply.""" headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/notid/e-filing", "X-Title": "eFiling Video Analyzer", } with httpx.Client(timeout=120.0) as client: resp = client.post( f"{OPENROUTER_BASE}/chat/completions", headers=headers, json=payload, ) resp.raise_for_status() data = resp.json() choices = data.get("choices", []) if not choices: raise ValueError(f"No choices in OpenRouter response: {json.dumps(data, indent=2)[:500]}") return choices[0]["message"]["content"] # --------------------------------------------------------------------------- # Output # --------------------------------------------------------------------------- def write_report(video_path: Path, analysis: str, model: str, num_frames: int, duration: float) -> Path: """Write the analysis as a markdown file in docs/research/.""" output_dir = Path(__file__).resolve().parent.parent / "docs" / "research" output_dir.mkdir(parents=True, exist_ok=True) safe_name = re.sub(r"[^\w\s\-]", "", video_path.stem) timestamp = datetime.now(timezone.utc).strftime("%Y%m%d") out_file = output_dir / f"{safe_name}_{timestamp}.md" dur_min = int(duration // 60) dur_sec = int(duration % 60) header = f"""\ # eFiling — UX Analysis: {video_path.name} | Field | Value | |-------|-------| | **Source video** | `{video_path.name}` | | **Duration** | {dur_min}m {dur_sec}s |\n| **Analysis date** | {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")} | | **Model** | {model} | | **Frames analyzed** | {num_frames} | --- """ out_file.write_text(header + analysis) return out_file # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def probe_duration(video_path: Path) -> float: """Get video duration in seconds.""" try: dur = subprocess.check_output( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(video_path)], stderr=subprocess.DEVNULL, ).decode().strip() return float(dur) except (subprocess.CalledProcessError, FileNotFoundError, ValueError): return 0.0 # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Analyze screen recordings with Gemini via OpenRouter", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ available strategies: --interval 30 extract one frame every 30 seconds (default, good for long videos) --num-frames 6 evenly spread N frames across the whole video examples: # analyze one specific video python -m app.analyze_videos "videos/E-Filing in Filevine.mp4" # analyze all videos with one frame every 30 s (default) python -m app.analyze_videos # exactly 8 frames spread across each video python -m app.analyze_videos --num-frames 8 """, ) parser.add_argument( "videos", nargs="*", default=[], help="Video files to analyze (defaults to all .mp4 in videos/)", ) parser.add_argument( "--interval", type=int, default=DEFAULT_INTERVAL, help="Extract one frame every N seconds (default: 30). Overrides --num-frames.", ) parser.add_argument( "--num-frames", type=int, default=int(os.getenv("NUM_FRAMES", "0")), help="Extract exactly N frames, evenly spaced. Set to >0 to override --interval.", ) parser.add_argument( "--model", type=str, default=os.getenv("OPENROUTER_MODEL", DEFAULT_MODEL), help=f"OpenRouter model (default: {DEFAULT_MODEL})", ) args = parser.parse_args() global DEFAULT_MODEL DEFAULT_MODEL = args.model # Resolve video paths videos_dir = Path(__file__).resolve().parent.parent / "videos" if args.videos: video_paths = [Path(v) for v in args.videos] elif videos_dir.exists(): video_paths = sorted(videos_dir.glob("*.mp4")) else: print("No videos found. Pass paths explicitly or put .mp4 files in videos/", file=sys.stderr) sys.exit(1) if not video_paths: print("No .mp4 files to analyze.", file=sys.stderr) sys.exit(0) strategy_label = "exact frames" if args.num_frames > 0 else f"interval ({args.interval}s)" print(f"Analyzing {len(video_paths)} video(s) with model '{args.model}'...") print(f"Strategy: {strategy_label}") print() for i, vp in enumerate(video_paths, 1): print(f"[{i}/{len(video_paths)}] {vp.name}") duration = probe_duration(vp) if duration <= 0: print(f" SKIP — could not determine duration", file=sys.stderr) continue timestamps = pick_timestamps(duration, args.interval, args.num_frames) frames = extract_frames(vp, timestamps) if not frames: print(f" SKIP — no frames extracted") continue print(f" Strategy: {strategy_label} → {len(frames)} frame(s) from {int(duration//60)}m{int(duration%60):02}s video") try: payload = build_payload(frames) analysis = call_openrouter(payload) out_file = write_report(vp, analysis, args.model, len(frames), duration) print(f" ✅ Saved to {out_file}") except Exception as exc: print(f" ❌ Error: {exc}", file=sys.stderr) continue print() print("Done.") if __name__ == "__main__": main()