Switch to full video upload instead of frame extraction

This commit is contained in:
2026-05-13 11:11:02 -07:00
parent 2857194759
commit 0aa30847fe

View File

@@ -1,22 +1,12 @@
#!/usr/bin/env python3
"""Analyze screen recordings using OpenRouter + Gemini Vision.
Extracts key frames from videos and sends them to Gemini via OpenRouter,
prompting for a UX research-style analysis. Saves results as markdown
in docs/research/.
Sends the full video file directly to Gemini via OpenRouter for a
UX research-style analysis. Saves results as markdown in docs/research/.
Usage:
# Analyze all videos in videos/
uv run python -m app.analyze_videos
# Analyze one specific video
uv run python -m app.analyze_videos "videos/E-Filing in Filevine.mp4"
# Extract a frame every 30 seconds (recommended for 3-4 min videos)
INTERVAL=15 uv run python -m app.analyze_videos
# Force exactly N frames, evenly spaced
NUM_FRAMES=8 uv run python -m app.analyze_videos
uv run python -m app.analyze_videos # analyze all .mp4 in videos/
uv run python -m app.analyze_videos videos/file.mp4 # single video
"""
import argparse
@@ -36,7 +26,7 @@ from dotenv import load_dotenv
# Config
# ---------------------------------------------------------------------------
load_dotenv() # loads .env in repo root or parent directories
load_dotenv()
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
if not OPENROUTER_API_KEY:
@@ -49,12 +39,7 @@ if not OPENROUTER_API_KEY:
sys.exit(1)
OPENROUTER_BASE = "https://openrouter.ai/api/v1"
# Gemini models available on OpenRouter:
# google/gemini-2.0-flash-exp:free (free, good for testing)
# google/gemini-2.0-flash (fast, multimodal)
# google/gemini-2.5-flash-preview-05-20 (latest preview)
DEFAULT_MODEL = os.getenv("OPENROUTER_MODEL", "google/gemini-2.5-flash-preview-05-20")
DEFAULT_INTERVAL = int(os.getenv("INTERVAL", "30")) # seconds between frames
DEFAULT_MODEL = os.getenv("OPENROUTER_MODEL", "google/gemini-2.0-pro-exp-02-05:free")
UX_PROMPT = """\
Analyze this screen recording like a UX researcher.
@@ -78,156 +63,10 @@ Output:
5. suggested improvements
Be specific about UI elements, button labels, menu paths, and exact behaviors
you observe in the frames provided.
you observe throughout the video.
"""
# ---------------------------------------------------------------------------
# Frame extraction
# ---------------------------------------------------------------------------
def pick_timestamps(duration: float, interval_sec: int = 30, num_frames: int = 0) -> list[float]:
"""Pick timestamps to extract frames from a video.
Two strategies:
- interval : one frame every N seconds (default). Good for longer videos.
- num_frames: evenly spread exactly N frames across the whole video.
Always skips the first and last 2% to avoid black intro/outro frames.
"""
margin = max(duration * 0.02, 1.0)
usable = duration - 2 * margin
if num_frames > 0:
return [round(margin + i * usable / (num_frames - 1), 2) for i in range(num_frames)]
else:
timestamps: list[float] = []
t = margin
while t <= (duration - margin):
timestamps.append(round(t, 2))
t += interval_sec
if not timestamps:
timestamps.append(margin)
return timestamps
def extract_frames(video_path: Path, timestamps: list[float]) -> list[dict]:
"""Extract frames from a video at the given timestamps using ffmpeg."""
if not video_path.exists():
print(f"SKIP — file not found: {video_path}", file=sys.stderr)
return []
tmp_dir = Path(".tmp_video_frames")
tmp_dir.mkdir(exist_ok=True)
images = []
for i, ts in enumerate(timestamps):
out_path = tmp_dir / f"{video_path.stem}_frame_{i:03d}.jpg"
try:
subprocess.run(
[
"ffmpeg",
"-y",
"-ss", str(ts),
"-i", str(video_path),
"-vframes:v", "1",
"-q:v", "2", # good quality JPEG
"-an",
str(out_path),
],
capture_output=True,
check=True,
)
if out_path.exists():
images.append({"path": out_path})
except (subprocess.CalledProcessError, FileNotFoundError):
continue
return images
# ---------------------------------------------------------------------------
# OpenRouter / Gemini API
# ---------------------------------------------------------------------------
def build_payload(images: list[dict]) -> dict:
"""Build the OpenRouter chat completion payload with image content."""
content = [{"type": "text", "text": UX_PROMPT}]
for img in images:
with open(img["path"], "rb") as f:
encoded = base64.b64encode(f.read()).decode()
ext = Path(img["path"]).suffix.lstrip(".")
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/{ext};base64,{encoded}",
},
})
return {
"model": DEFAULT_MODEL,
"messages": [{"role": "user", "content": content}],
"max_tokens": 8192,
"temperature": 0.3,
}
def call_openrouter(payload: dict) -> str:
"""Send request to OpenRouter and return the assistant's reply."""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/notid/e-filing",
"X-Title": "eFiling Video Analyzer",
}
with httpx.Client(timeout=120.0) as client:
resp = client.post(
f"{OPENROUTER_BASE}/chat/completions",
headers=headers,
json=payload,
)
resp.raise_for_status()
data = resp.json()
choices = data.get("choices", [])
if not choices:
raise ValueError(f"No choices in OpenRouter response: {json.dumps(data, indent=2)[:500]}")
return choices[0]["message"]["content"]
# ---------------------------------------------------------------------------
# Output
# ---------------------------------------------------------------------------
def write_report(video_path: Path, analysis: str, model: str, num_frames: int, duration: float) -> Path:
"""Write the analysis as a markdown file in docs/research/."""
output_dir = Path(__file__).resolve().parent.parent / "docs" / "research"
output_dir.mkdir(parents=True, exist_ok=True)
safe_name = re.sub(r"[^\w\s\-]", "", video_path.stem)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d")
out_file = output_dir / f"{safe_name}_{timestamp}.md"
dur_min = int(duration // 60)
dur_sec = int(duration % 60)
header = f"""\
# eFiling — UX Analysis: {video_path.name}
| Field | Value |
|-------|-------|
| **Source video** | `{video_path.name}` |
| **Duration** | {dur_min}m {dur_sec}s |\n| **Analysis date** | {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")} |
| **Model** | {model} |
| **Frames analyzed** | {num_frames} |
---
"""
out_file.write_text(header + analysis)
return out_file
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@@ -245,6 +84,99 @@ def probe_duration(video_path: Path) -> float:
return 0.0
def read_video(video_path: Path) -> tuple[bytes, str]:
"""Read a video file and return (bytes, MIME type)."""
ext = video_path.suffix.lstrip(".").lower()
mime_map = {
"mp4": "video/mp4",
"mov": "video/quicktime",
"webm": "video/webm",
"mkv": "video/x-matroska",
}
mime = mime_map.get(ext, f"video/{ext}")
with open(video_path, "rb") as f:
data = f.read()
return data, mime
def build_payload(video_path: Path, duration: float) -> dict:
"""Build the OpenRouter chat completion payload with a video attachment."""
video_data, mime = read_video(video_path)
encoded = base64.b64encode(video_data).decode()
content = [
{"type": "text", "text": f"{UX_PROMPT}\n\n(Duration: {int(duration//60)}m{int(duration%60):02}s)"},
{
"type": "video_url",
"video_url": {
"url": f"data:{mime};base64,{encoded}",
},
},
]
return {
"model": DEFAULT_MODEL,
"messages": [{"role": "user", "content": content}],
"max_tokens": 8192,
"temperature": 0.3,
}
def call_openrouter(payload: dict) -> str:
"""Send request to OpenRouter and return the assistant's reply."""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/notid/e-filing",
"X-Title": "eFiling Video Analyzer",
}
with httpx.Client(timeout=300.0) as client:
resp = client.post(
f"{OPENROUTER_BASE}/chat/completions",
headers=headers,
json=payload,
)
resp.raise_for_status()
data = resp.json()
choices = data.get("choices", [])
if not choices:
raise ValueError(f"No choices in OpenRouter response: {json.dumps(data, indent=2)[:500]}")
return choices[0]["message"]["content"]
# ---------------------------------------------------------------------------
# Output
# ---------------------------------------------------------------------------
def write_report(video_path: Path, analysis: str, model: str, duration: float) -> Path:
"""Write the analysis as a markdown file in docs/research/."""
output_dir = Path(__file__).resolve().parent.parent / "docs" / "research"
output_dir.mkdir(parents=True, exist_ok=True)
safe_name = re.sub(r"[^\w\s\-]", "", video_path.stem)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d")
out_file = output_dir / f"{safe_name}_{timestamp}.md"
dur_min = int(duration // 60)
dur_sec = int(duration % 60)
header = f"""\
# eFiling — UX Analysis: {video_path.name}
| Field | Value |
|-------|-------|
| **Source video** | `{video_path.name}` |
| **Duration** | {dur_min}m {dur_sec}s |\n| **Analysis date** | {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")} |
| **Model** | {model} |
---
"""
out_file.write_text(header + analysis)
return out_file
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
@@ -254,19 +186,12 @@ def main():
description="Analyze screen recordings with Gemini via OpenRouter",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
available strategies:
--interval 30 extract one frame every 30 seconds (default, good for long videos)
--num-frames 6 evenly spread N frames across the whole video
examples:
# analyze one specific video
python -m app.analyze_videos "videos/E-Filing in Filevine.mp4"
# analyze all videos with one frame every 30 s (default)
# analyze all videos in videos/
python -m app.analyze_videos
# exactly 8 frames spread across each video
python -m app.analyze_videos --num-frames 8
""",
)
parser.add_argument(
@@ -275,18 +200,6 @@ examples:
default=[],
help="Video files to analyze (defaults to all .mp4 in videos/)",
)
parser.add_argument(
"--interval",
type=int,
default=DEFAULT_INTERVAL,
help="Extract one frame every N seconds (default: 30). Overrides --num-frames.",
)
parser.add_argument(
"--num-frames",
type=int,
default=int(os.getenv("NUM_FRAMES", "0")),
help="Extract exactly N frames, evenly spaced. Set to >0 to override --interval.",
)
parser.add_argument(
"--model",
type=str,
@@ -303,18 +216,17 @@ examples:
if args.videos:
video_paths = [Path(v) for v in args.videos]
elif videos_dir.exists():
video_paths = sorted(videos_dir.glob("*.mp4"))
video_paths = sorted(videos_dir.glob("*"))
else:
print("No videos found. Pass paths explicitly or put .mp4 files in videos/", file=sys.stderr)
print("No videos found. Pass paths explicitly or put files in videos/", file=sys.stderr)
sys.exit(1)
if not video_paths:
print("No .mp4 files to analyze.", file=sys.stderr)
print("No video files to analyze.", file=sys.stderr)
sys.exit(0)
strategy_label = "exact frames" if args.num_frames > 0 else f"interval ({args.interval}s)"
print(f"Analyzing {len(video_paths)} video(s) with model '{args.model}'...")
print(f"Strategy: {strategy_label}")
print("Mode: full-video upload (no frame extraction)")
print()
for i, vp in enumerate(video_paths, 1):
@@ -325,18 +237,10 @@ examples:
print(f" SKIP — could not determine duration", file=sys.stderr)
continue
timestamps = pick_timestamps(duration, args.interval, args.num_frames)
frames = extract_frames(vp, timestamps)
if not frames:
print(f" SKIP — no frames extracted")
continue
print(f" Strategy: {strategy_label}{len(frames)} frame(s) from {int(duration//60)}m{int(duration%60):02}s video")
try:
payload = build_payload(frames)
payload = build_payload(vp, duration)
analysis = call_openrouter(payload)
out_file = write_report(vp, analysis, args.model, len(frames), duration)
out_file = write_report(vp, analysis, args.model, duration)
print(f" ✅ Saved to {out_file}")
except Exception as exc:
print(f" ❌ Error: {exc}", file=sys.stderr)