Files
ai-game-2/asset-work/generate_from_caption_combo.py
2026-03-27 08:14:33 -07:00

230 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""Generate a combo image from a kq4-decompile visual image using ComfyUI workflow."""
import json
import os
import shutil
import sys
import time
import urllib.request
import uuid
from pathlib import Path
def check_server(server_address: str = "127.0.0.1:8188", timeout: int = 5) -> bool:
"""Check if ComfyUI server is running and accessible."""
try:
req = urllib.request.Request(
f"http://{server_address}/system_stats",
method="GET",
)
with urllib.request.urlopen(req, timeout=timeout) as response:
return response.status == 200
except Exception:
return False
def queue_prompt(prompt: dict, server_address: str = "127.0.0.1:8188") -> dict:
"""Queue a prompt to ComfyUI server."""
client_id = str(uuid.uuid4())
p = {"prompt": prompt, "client_id": client_id}
data = json.dumps(p).encode("utf-8")
req = urllib.request.Request(
f"http://{server_address}/prompt",
data=data,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req) as response:
return json.loads(response.read())
def get_existing_files(directory: str) -> set:
"""Get set of existing filenames in directory."""
if not os.path.exists(directory):
return set()
return set(f for f in os.listdir(directory) if f.endswith(".png"))
def wait_for_file_with_prefix(
directory: str, prefix: str, timeout: int = 240
) -> str | None:
"""Wait for a PNG file with specific prefix to appear in directory."""
start_time = time.time()
while time.time() - start_time < timeout:
if os.path.exists(directory):
for filename in os.listdir(directory):
if filename.startswith(prefix) and filename.endswith(".png"):
return os.path.join(directory, filename)
time.sleep(0.5)
return None
def generate_from_caption_combo(
image_path: str,
caption_text: str,
seed: int,
server_address: str = "127.0.0.1:8188",
caption_num: int = 1,
) -> str:
"""Generate combo image from kq4-decompile visual image with given seed.
Args:
image_path: Path to kq4-decompile visual image
caption_text: Direct caption text for the scene (not AI-generated)
seed: Random seed for generation
server_address: ComfyUI server address
Returns:
Path to the generated output image
"""
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
image_path = os.path.abspath(image_path)
script_dir = os.path.dirname(os.path.abspath(__file__))
workflow_path = os.path.join(script_dir, "..", "tools", "klein-qwen-combo.json")
workflow_path = os.path.normpath(workflow_path)
with open(workflow_path, "r") as f:
workflow = json.load(f)
workflow["157"]["inputs"]["image"] = image_path
workflow["167"]["inputs"]["text"] = caption_text
workflow["196"]["inputs"]["seed"] = seed
unique_id = str(uuid.uuid4())[:8]
filename_prefix = f"auto_background/gen_{unique_id}"
workflow["183"]["inputs"]["filename_prefix"] = filename_prefix
comfy_output_dir = (
"/mnt/cached/staging/data_backup/ai/ComfyUI/output/auto_background"
)
os.makedirs(comfy_output_dir, exist_ok=True)
existing_files = get_existing_files(comfy_output_dir)
print(f"Found {len(existing_files)} existing files in output directory")
print(f"Queuing prompt with seed {seed}...")
response = queue_prompt(workflow, server_address)
prompt_id = response["prompt_id"]
print(f"Prompt ID: {prompt_id}")
print(f"Expected filename prefix: {filename_prefix}")
print("Waiting for generation (up to 4 minutes)...")
new_file = wait_for_file_with_prefix(
comfy_output_dir, f"gen_{unique_id}", timeout=240
)
if not new_file:
raise RuntimeError("Timeout: No new image file appeared in 4 minutes")
print(f"New image generated: {os.path.basename(new_file)}")
time.sleep(2)
image_basename = os.path.basename(image_path)
room_name = (
image_basename.replace("pic_", "").replace("_visual.png", "").replace("_", "-")
)
combo_outputs_dir = os.path.join(script_dir, "combo_outputs", room_name)
os.makedirs(combo_outputs_dir, exist_ok=True)
output_image_path = os.path.join(
combo_outputs_dir, f"{room_name}_caption_{caption_num}_{seed}_generated.png"
)
shutil.copy2(new_file, output_image_path)
print(f"Saved image: {output_image_path}")
return output_image_path
def main():
import argparse
parser = argparse.ArgumentParser(
description="Generate combo image from kq4-decompile visual using ComfyUI"
)
parser.add_argument(
"image_path",
help="Path to kq4-decompile visual image (e.g., kq4-sierra-decompile/rooms/kq4-057-witch-cave/pic_057_visual.png)",
)
parser.add_argument(
"caption_text",
help="Direct caption text for the scene (not AI-generated)",
)
parser.add_argument("seed", type=int, help="Random seed for generation")
parser.add_argument(
"--server",
default="127.0.0.1:8188",
help="ComfyUI server address (default: 127.0.0.1:8188)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Test mode: validate image file and server connection without generating images",
)
parser.add_argument(
"--caption-num",
type=int,
default=1,
help="Caption number for output filename (default: 1)",
)
args = parser.parse_args()
if not os.path.exists(args.image_path):
print(f"Error: Image file not found: {args.image_path}")
sys.exit(1)
image_basename = os.path.basename(args.image_path)
print(f"Image: {image_basename}")
print(
f"Caption: {args.caption_text[:50]}..."
if len(args.caption_text) > 50
else f"Caption: {args.caption_text}"
)
print(f"Seed: {args.seed}")
print(f"Server: {args.server}")
if args.dry_run:
print("\n[Dry Run Mode - Checking server connection...]")
if check_server(args.server):
print("✓ ComfyUI server is running and accessible")
print("\n✓ Dry run successful! All checks passed.")
sys.exit(0)
else:
print(f"✗ ComfyUI server is not accessible at {args.server}")
print(" Please ensure ComfyUI is running before generating images.")
sys.exit(1)
print("\nChecking ComfyUI server...")
if not check_server(args.server):
print(f"Error: ComfyUI server is not running at {args.server}")
print("Please start ComfyUI first or check the server address.")
print(f"\nTo test without generating, use: --dry-run")
sys.exit(1)
print("✓ ComfyUI server is running")
try:
output_path = generate_from_caption_combo(
args.image_path,
args.caption_text,
args.seed,
args.server,
caption_num=args.caption_num,
)
print(f"\nGeneration complete! Output: {output_path}")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()