diff --git a/app/analyze_videos.py b/app/analyze_videos.py index 72fb121..f50e0ff 100644 --- a/app/analyze_videos.py +++ b/app/analyze_videos.py @@ -1,22 +1,12 @@ #!/usr/bin/env python3 """Analyze screen recordings using OpenRouter + Gemini Vision. -Extracts key frames from videos and sends them to Gemini via OpenRouter, -prompting for a UX research-style analysis. Saves results as markdown -in docs/research/. +Sends the full video file directly to Gemini via OpenRouter for a +UX research-style analysis. Saves results as markdown in docs/research/. Usage: - # Analyze all videos in videos/ - uv run python -m app.analyze_videos - - # Analyze one specific video - uv run python -m app.analyze_videos "videos/E-Filing in Filevine.mp4" - - # Extract a frame every 30 seconds (recommended for 3-4 min videos) - INTERVAL=15 uv run python -m app.analyze_videos - - # Force exactly N frames, evenly spaced - NUM_FRAMES=8 uv run python -m app.analyze_videos + uv run python -m app.analyze_videos # analyze all .mp4 in videos/ + uv run python -m app.analyze_videos videos/file.mp4 # single video """ import argparse @@ -36,7 +26,7 @@ from dotenv import load_dotenv # Config # --------------------------------------------------------------------------- -load_dotenv() # loads .env in repo root or parent directories +load_dotenv() OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") if not OPENROUTER_API_KEY: @@ -49,12 +39,7 @@ if not OPENROUTER_API_KEY: sys.exit(1) OPENROUTER_BASE = "https://openrouter.ai/api/v1" -# Gemini models available on OpenRouter: -# google/gemini-2.0-flash-exp:free (free, good for testing) -# google/gemini-2.0-flash (fast, multimodal) -# google/gemini-2.5-flash-preview-05-20 (latest preview) -DEFAULT_MODEL = os.getenv("OPENROUTER_MODEL", "google/gemini-2.5-flash-preview-05-20") -DEFAULT_INTERVAL = int(os.getenv("INTERVAL", "30")) # seconds between frames +DEFAULT_MODEL = os.getenv("OPENROUTER_MODEL", "google/gemini-2.0-pro-exp-02-05:free") UX_PROMPT = """\ Analyze this screen recording like a UX researcher. @@ -78,156 +63,10 @@ Output: 5. suggested improvements Be specific about UI elements, button labels, menu paths, and exact behaviors -you observe in the frames provided. +you observe throughout the video. """ -# --------------------------------------------------------------------------- -# Frame extraction -# --------------------------------------------------------------------------- - -def pick_timestamps(duration: float, interval_sec: int = 30, num_frames: int = 0) -> list[float]: - """Pick timestamps to extract frames from a video. - - Two strategies: - - interval : one frame every N seconds (default). Good for longer videos. - - num_frames: evenly spread exactly N frames across the whole video. - - Always skips the first and last 2% to avoid black intro/outro frames. - """ - margin = max(duration * 0.02, 1.0) - usable = duration - 2 * margin - - if num_frames > 0: - return [round(margin + i * usable / (num_frames - 1), 2) for i in range(num_frames)] - else: - timestamps: list[float] = [] - t = margin - while t <= (duration - margin): - timestamps.append(round(t, 2)) - t += interval_sec - if not timestamps: - timestamps.append(margin) - return timestamps - - -def extract_frames(video_path: Path, timestamps: list[float]) -> list[dict]: - """Extract frames from a video at the given timestamps using ffmpeg.""" - if not video_path.exists(): - print(f"SKIP — file not found: {video_path}", file=sys.stderr) - return [] - - tmp_dir = Path(".tmp_video_frames") - tmp_dir.mkdir(exist_ok=True) - - images = [] - for i, ts in enumerate(timestamps): - out_path = tmp_dir / f"{video_path.stem}_frame_{i:03d}.jpg" - try: - subprocess.run( - [ - "ffmpeg", - "-y", - "-ss", str(ts), - "-i", str(video_path), - "-vframes:v", "1", - "-q:v", "2", # good quality JPEG - "-an", - str(out_path), - ], - capture_output=True, - check=True, - ) - if out_path.exists(): - images.append({"path": out_path}) - except (subprocess.CalledProcessError, FileNotFoundError): - continue - - return images - - -# --------------------------------------------------------------------------- -# OpenRouter / Gemini API -# --------------------------------------------------------------------------- - -def build_payload(images: list[dict]) -> dict: - """Build the OpenRouter chat completion payload with image content.""" - content = [{"type": "text", "text": UX_PROMPT}] - for img in images: - with open(img["path"], "rb") as f: - encoded = base64.b64encode(f.read()).decode() - ext = Path(img["path"]).suffix.lstrip(".") - content.append({ - "type": "image_url", - "image_url": { - "url": f"data:image/{ext};base64,{encoded}", - }, - }) - - return { - "model": DEFAULT_MODEL, - "messages": [{"role": "user", "content": content}], - "max_tokens": 8192, - "temperature": 0.3, - } - - -def call_openrouter(payload: dict) -> str: - """Send request to OpenRouter and return the assistant's reply.""" - headers = { - "Authorization": f"Bearer {OPENROUTER_API_KEY}", - "Content-Type": "application/json", - "HTTP-Referer": "https://github.com/notid/e-filing", - "X-Title": "eFiling Video Analyzer", - } - - with httpx.Client(timeout=120.0) as client: - resp = client.post( - f"{OPENROUTER_BASE}/chat/completions", - headers=headers, - json=payload, - ) - resp.raise_for_status() - data = resp.json() - - choices = data.get("choices", []) - if not choices: - raise ValueError(f"No choices in OpenRouter response: {json.dumps(data, indent=2)[:500]}") - return choices[0]["message"]["content"] - - -# --------------------------------------------------------------------------- -# Output -# --------------------------------------------------------------------------- - -def write_report(video_path: Path, analysis: str, model: str, num_frames: int, duration: float) -> Path: - """Write the analysis as a markdown file in docs/research/.""" - output_dir = Path(__file__).resolve().parent.parent / "docs" / "research" - output_dir.mkdir(parents=True, exist_ok=True) - - safe_name = re.sub(r"[^\w\s\-]", "", video_path.stem) - timestamp = datetime.now(timezone.utc).strftime("%Y%m%d") - out_file = output_dir / f"{safe_name}_{timestamp}.md" - - dur_min = int(duration // 60) - dur_sec = int(duration % 60) - header = f"""\ -# eFiling — UX Analysis: {video_path.name} - -| Field | Value | -|-------|-------| -| **Source video** | `{video_path.name}` | -| **Duration** | {dur_min}m {dur_sec}s |\n| **Analysis date** | {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")} | -| **Model** | {model} | -| **Frames analyzed** | {num_frames} | - ---- - -""" - out_file.write_text(header + analysis) - return out_file - - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -245,6 +84,99 @@ def probe_duration(video_path: Path) -> float: return 0.0 +def read_video(video_path: Path) -> tuple[bytes, str]: + """Read a video file and return (bytes, MIME type).""" + ext = video_path.suffix.lstrip(".").lower() + mime_map = { + "mp4": "video/mp4", + "mov": "video/quicktime", + "webm": "video/webm", + "mkv": "video/x-matroska", + } + mime = mime_map.get(ext, f"video/{ext}") + with open(video_path, "rb") as f: + data = f.read() + return data, mime + + +def build_payload(video_path: Path, duration: float) -> dict: + """Build the OpenRouter chat completion payload with a video attachment.""" + video_data, mime = read_video(video_path) + encoded = base64.b64encode(video_data).decode() + + content = [ + {"type": "text", "text": f"{UX_PROMPT}\n\n(Duration: {int(duration//60)}m{int(duration%60):02}s)"}, + { + "type": "video_url", + "video_url": { + "url": f"data:{mime};base64,{encoded}", + }, + }, + ] + + return { + "model": DEFAULT_MODEL, + "messages": [{"role": "user", "content": content}], + "max_tokens": 8192, + "temperature": 0.3, + } + + +def call_openrouter(payload: dict) -> str: + """Send request to OpenRouter and return the assistant's reply.""" + headers = { + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "Content-Type": "application/json", + "HTTP-Referer": "https://github.com/notid/e-filing", + "X-Title": "eFiling Video Analyzer", + } + + with httpx.Client(timeout=300.0) as client: + resp = client.post( + f"{OPENROUTER_BASE}/chat/completions", + headers=headers, + json=payload, + ) + resp.raise_for_status() + data = resp.json() + + choices = data.get("choices", []) + if not choices: + raise ValueError(f"No choices in OpenRouter response: {json.dumps(data, indent=2)[:500]}") + return choices[0]["message"]["content"] + + +# --------------------------------------------------------------------------- +# Output +# --------------------------------------------------------------------------- + +def write_report(video_path: Path, analysis: str, model: str, duration: float) -> Path: + """Write the analysis as a markdown file in docs/research/.""" + output_dir = Path(__file__).resolve().parent.parent / "docs" / "research" + output_dir.mkdir(parents=True, exist_ok=True) + + safe_name = re.sub(r"[^\w\s\-]", "", video_path.stem) + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d") + out_file = output_dir / f"{safe_name}_{timestamp}.md" + + dur_min = int(duration // 60) + dur_sec = int(duration % 60) + header = f"""\ +# eFiling — UX Analysis: {video_path.name} + +| Field | Value | +|-------|-------| +| **Source video** | `{video_path.name}` | +| **Duration** | {dur_min}m {dur_sec}s |\n| **Analysis date** | {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")} | +| **Model** | {model} | + +--- + +""" + out_file.write_text(header + analysis) + return out_file + + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- @@ -254,19 +186,12 @@ def main(): description="Analyze screen recordings with Gemini via OpenRouter", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ -available strategies: - --interval 30 extract one frame every 30 seconds (default, good for long videos) - --num-frames 6 evenly spread N frames across the whole video - examples: # analyze one specific video python -m app.analyze_videos "videos/E-Filing in Filevine.mp4" - # analyze all videos with one frame every 30 s (default) + # analyze all videos in videos/ python -m app.analyze_videos - - # exactly 8 frames spread across each video - python -m app.analyze_videos --num-frames 8 """, ) parser.add_argument( @@ -275,18 +200,6 @@ examples: default=[], help="Video files to analyze (defaults to all .mp4 in videos/)", ) - parser.add_argument( - "--interval", - type=int, - default=DEFAULT_INTERVAL, - help="Extract one frame every N seconds (default: 30). Overrides --num-frames.", - ) - parser.add_argument( - "--num-frames", - type=int, - default=int(os.getenv("NUM_FRAMES", "0")), - help="Extract exactly N frames, evenly spaced. Set to >0 to override --interval.", - ) parser.add_argument( "--model", type=str, @@ -303,18 +216,17 @@ examples: if args.videos: video_paths = [Path(v) for v in args.videos] elif videos_dir.exists(): - video_paths = sorted(videos_dir.glob("*.mp4")) + video_paths = sorted(videos_dir.glob("*")) else: - print("No videos found. Pass paths explicitly or put .mp4 files in videos/", file=sys.stderr) + print("No videos found. Pass paths explicitly or put files in videos/", file=sys.stderr) sys.exit(1) if not video_paths: - print("No .mp4 files to analyze.", file=sys.stderr) + print("No video files to analyze.", file=sys.stderr) sys.exit(0) - strategy_label = "exact frames" if args.num_frames > 0 else f"interval ({args.interval}s)" print(f"Analyzing {len(video_paths)} video(s) with model '{args.model}'...") - print(f"Strategy: {strategy_label}") + print("Mode: full-video upload (no frame extraction)") print() for i, vp in enumerate(video_paths, 1): @@ -325,18 +237,10 @@ examples: print(f" SKIP — could not determine duration", file=sys.stderr) continue - timestamps = pick_timestamps(duration, args.interval, args.num_frames) - frames = extract_frames(vp, timestamps) - if not frames: - print(f" SKIP — no frames extracted") - continue - - print(f" Strategy: {strategy_label} → {len(frames)} frame(s) from {int(duration//60)}m{int(duration%60):02}s video") - try: - payload = build_payload(frames) + payload = build_payload(vp, duration) analysis = call_openrouter(payload) - out_file = write_report(vp, analysis, args.model, len(frames), duration) + out_file = write_report(vp, analysis, args.model, duration) print(f" ✅ Saved to {out_file}") except Exception as exc: print(f" ❌ Error: {exc}", file=sys.stderr)