diff --git a/app/analyze_videos.py b/app/analyze_videos.py index d4405de..72fb121 100644 --- a/app/analyze_videos.py +++ b/app/analyze_videos.py @@ -6,9 +6,17 @@ prompting for a UX research-style analysis. Saves results as markdown in docs/research/. Usage: - uv run python -m app.analyze_videos # analyze all .mp4 in videos/ - uv run python -m app.analyze_videos videos/file.mp4 # single video - NUM_FRAMES=8 uv run python -m app.analyze_videos # custom frame count + # Analyze all videos in videos/ + uv run python -m app.analyze_videos + + # Analyze one specific video + uv run python -m app.analyze_videos "videos/E-Filing in Filevine.mp4" + + # Extract a frame every 30 seconds (recommended for 3-4 min videos) + INTERVAL=15 uv run python -m app.analyze_videos + + # Force exactly N frames, evenly spaced + NUM_FRAMES=8 uv run python -m app.analyze_videos """ import argparse @@ -44,8 +52,9 @@ OPENROUTER_BASE = "https://openrouter.ai/api/v1" # Gemini models available on OpenRouter: # google/gemini-2.0-flash-exp:free (free, good for testing) # google/gemini-2.0-flash (fast, multimodal) -# google/gemini-2.5-flash-preview-04-17 (latest preview) +# google/gemini-2.5-flash-preview-05-20 (latest preview) DEFAULT_MODEL = os.getenv("OPENROUTER_MODEL", "google/gemini-2.5-flash-preview-05-20") +DEFAULT_INTERVAL = int(os.getenv("INTERVAL", "30")) # seconds between frames UX_PROMPT = """\ Analyze this screen recording like a UX researcher. @@ -72,12 +81,38 @@ Be specific about UI elements, button labels, menu paths, and exact behaviors you observe in the frames provided. """ + # --------------------------------------------------------------------------- # Frame extraction # --------------------------------------------------------------------------- -def extract_frames(video_path: Path, num_frames: int = 6) -> list[dict]: - """Extract evenly-spaced key frames from a video using ffmpeg.""" +def pick_timestamps(duration: float, interval_sec: int = 30, num_frames: int = 0) -> list[float]: + """Pick timestamps to extract frames from a video. + + Two strategies: + - interval : one frame every N seconds (default). Good for longer videos. + - num_frames: evenly spread exactly N frames across the whole video. + + Always skips the first and last 2% to avoid black intro/outro frames. + """ + margin = max(duration * 0.02, 1.0) + usable = duration - 2 * margin + + if num_frames > 0: + return [round(margin + i * usable / (num_frames - 1), 2) for i in range(num_frames)] + else: + timestamps: list[float] = [] + t = margin + while t <= (duration - margin): + timestamps.append(round(t, 2)) + t += interval_sec + if not timestamps: + timestamps.append(margin) + return timestamps + + +def extract_frames(video_path: Path, timestamps: list[float]) -> list[dict]: + """Extract frames from a video at the given timestamps using ffmpeg.""" if not video_path.exists(): print(f"SKIP — file not found: {video_path}", file=sys.stderr) return [] @@ -85,43 +120,15 @@ def extract_frames(video_path: Path, num_frames: int = 6) -> list[dict]: tmp_dir = Path(".tmp_video_frames") tmp_dir.mkdir(exist_ok=True) - # Estimate duration - try: - dur_output = subprocess.check_output( - [ - "ffprobe", - "-v", "error", - "-show_entries", "format=duration", - "-of", "default=noprint_wrappers=1:nokey=1", - str(video_path), - ], - stderr=subprocess.DEVNULL, - ).decode().strip() - duration = float(dur_output) - except (subprocess.CalledProcessError, FileNotFoundError, ValueError): - print(f"SKIP — could not probe video: {video_path}", file=sys.stderr) - return [] - - if duration <= 0: - print(f"SKIP — bad duration for: {video_path}", file=sys.stderr) - return [] - - # Pick evenly spaced timestamps (skip first/last 2% to avoid black frames) - margin = max(duration * 0.02, 1.0) - times = [ - str(margin + i * (duration - 2 * margin) / (num_frames - 1)) - for i in range(num_frames) - ] - images = [] - for i, ts in enumerate(times): + for i, ts in enumerate(timestamps): out_path = tmp_dir / f"{video_path.stem}_frame_{i:03d}.jpg" try: subprocess.run( [ "ffmpeg", "-y", - "-ss", ts, + "-ss", str(ts), "-i", str(video_path), "-vframes:v", "1", "-q:v", "2", # good quality JPEG @@ -170,7 +177,6 @@ def call_openrouter(payload: dict) -> str: headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", - # Optional: pass-through headers for attribution / tracking "HTTP-Referer": "https://github.com/notid/e-filing", "X-Title": "eFiling Video Analyzer", } @@ -184,7 +190,6 @@ def call_openrouter(payload: dict) -> str: resp.raise_for_status() data = resp.json() - # Extract text from the response choices = data.get("choices", []) if not choices: raise ValueError(f"No choices in OpenRouter response: {json.dumps(data, indent=2)[:500]}") @@ -195,23 +200,24 @@ def call_openrouter(payload: dict) -> str: # Output # --------------------------------------------------------------------------- -def write_report(video_path: Path, analysis: str, model: str, num_frames: int) -> Path: +def write_report(video_path: Path, analysis: str, model: str, num_frames: int, duration: float) -> Path: """Write the analysis as a markdown file in docs/research/.""" output_dir = Path(__file__).resolve().parent.parent / "docs" / "research" output_dir.mkdir(parents=True, exist_ok=True) - # Sanitize filename safe_name = re.sub(r"[^\w\s\-]", "", video_path.stem) timestamp = datetime.now(timezone.utc).strftime("%Y%m%d") out_file = output_dir / f"{safe_name}_{timestamp}.md" + dur_min = int(duration // 60) + dur_sec = int(duration % 60) header = f"""\ # eFiling — UX Analysis: {video_path.name} | Field | Value | |-------|-------| | **Source video** | `{video_path.name}` | -| **Analysis date** | {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")} | +| **Duration** | {dur_min}m {dur_sec}s |\n| **Analysis date** | {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")} | | **Model** | {model} | | **Frames analyzed** | {num_frames} | @@ -222,23 +228,64 @@ def write_report(video_path: Path, analysis: str, model: str, num_frames: int) - return out_file +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def probe_duration(video_path: Path) -> float: + """Get video duration in seconds.""" + try: + dur = subprocess.check_output( + ["ffprobe", "-v", "error", "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", str(video_path)], + stderr=subprocess.DEVNULL, + ).decode().strip() + return float(dur) + except (subprocess.CalledProcessError, FileNotFoundError, ValueError): + return 0.0 + + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): - parser = argparse.ArgumentParser(description="Analyze screen recordings with Gemini via OpenRouter") + parser = argparse.ArgumentParser( + description="Analyze screen recordings with Gemini via OpenRouter", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +available strategies: + --interval 30 extract one frame every 30 seconds (default, good for long videos) + --num-frames 6 evenly spread N frames across the whole video + +examples: + # analyze one specific video + python -m app.analyze_videos "videos/E-Filing in Filevine.mp4" + + # analyze all videos with one frame every 30 s (default) + python -m app.analyze_videos + + # exactly 8 frames spread across each video + python -m app.analyze_videos --num-frames 8 +""", + ) parser.add_argument( "videos", nargs="*", default=[], help="Video files to analyze (defaults to all .mp4 in videos/)", ) + parser.add_argument( + "--interval", + type=int, + default=DEFAULT_INTERVAL, + help="Extract one frame every N seconds (default: 30). Overrides --num-frames.", + ) parser.add_argument( "--num-frames", type=int, - default=int(os.getenv("NUM_FRAMES", "6")), - help="Number of frames to extract per video (default: 6)", + default=int(os.getenv("NUM_FRAMES", "0")), + help="Extract exactly N frames, evenly spaced. Set to >0 to override --interval.", ) parser.add_argument( "--model", @@ -265,22 +312,31 @@ def main(): print("No .mp4 files to analyze.", file=sys.stderr) sys.exit(0) + strategy_label = "exact frames" if args.num_frames > 0 else f"interval ({args.interval}s)" print(f"Analyzing {len(video_paths)} video(s) with model '{args.model}'...") + print(f"Strategy: {strategy_label}") print() for i, vp in enumerate(video_paths, 1): print(f"[{i}/{len(video_paths)}] {vp.name}") - frames = extract_frames(vp, args.num_frames) - if not frames: + duration = probe_duration(vp) + if duration <= 0: + print(f" SKIP — could not determine duration", file=sys.stderr) continue - print(f" Extracted {len(frames)} frame(s)") + timestamps = pick_timestamps(duration, args.interval, args.num_frames) + frames = extract_frames(vp, timestamps) + if not frames: + print(f" SKIP — no frames extracted") + continue + + print(f" Strategy: {strategy_label} → {len(frames)} frame(s) from {int(duration//60)}m{int(duration%60):02}s video") try: payload = build_payload(frames) analysis = call_openrouter(payload) - out_file = write_report(vp, analysis, args.model, len(frames)) + out_file = write_report(vp, analysis, args.model, len(frames), duration) print(f" ✅ Saved to {out_file}") except Exception as exc: print(f" ❌ Error: {exc}", file=sys.stderr)