#!/usr/bin/env python3 """Generate a combo image from a kq4-decompile visual image using ComfyUI workflow.""" import json import os import shutil import sys import time import urllib.request import uuid from pathlib import Path def check_server(server_address: str = "127.0.0.1:8188", timeout: int = 5) -> bool: """Check if ComfyUI server is running and accessible.""" try: req = urllib.request.Request( f"http://{server_address}/system_stats", method="GET", ) with urllib.request.urlopen(req, timeout=timeout) as response: return response.status == 200 except Exception: return False def queue_prompt(prompt: dict, server_address: str = "127.0.0.1:8188") -> dict: """Queue a prompt to ComfyUI server.""" client_id = str(uuid.uuid4()) p = {"prompt": prompt, "client_id": client_id} data = json.dumps(p).encode("utf-8") req = urllib.request.Request( f"http://{server_address}/prompt", data=data, headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req) as response: return json.loads(response.read()) def get_existing_files(directory: str) -> set: """Get set of existing filenames in directory.""" if not os.path.exists(directory): return set() return set(f for f in os.listdir(directory) if f.endswith(".png")) def wait_for_file_with_prefix( directory: str, prefix: str, timeout: int = 240 ) -> str | None: """Wait for a PNG file with specific prefix to appear in directory.""" start_time = time.time() while time.time() - start_time < timeout: if os.path.exists(directory): for filename in os.listdir(directory): if filename.startswith(prefix) and filename.endswith(".png"): return os.path.join(directory, filename) time.sleep(0.5) return None def generate_from_caption_combo( image_path: str, caption_text: str, seed: int, server_address: str = "127.0.0.1:8188", caption_num: int = 1, ) -> str: """Generate combo image from kq4-decompile visual image with given seed. Args: image_path: Path to kq4-decompile visual image caption_text: Direct caption text for the scene (not AI-generated) seed: Random seed for generation server_address: ComfyUI server address Returns: Path to the generated output image """ if not os.path.exists(image_path): raise FileNotFoundError(f"Image file not found: {image_path}") image_path = os.path.abspath(image_path) script_dir = os.path.dirname(os.path.abspath(__file__)) workflow_path = os.path.join(script_dir, "..", "tools", "klein-qwen-combo.json") workflow_path = os.path.normpath(workflow_path) with open(workflow_path, "r") as f: workflow = json.load(f) workflow["157"]["inputs"]["image"] = image_path workflow["167"]["inputs"]["text"] = caption_text workflow["196"]["inputs"]["seed"] = seed unique_id = str(uuid.uuid4())[:8] filename_prefix = f"auto_background/gen_{unique_id}" workflow["183"]["inputs"]["filename_prefix"] = filename_prefix comfy_output_dir = ( "/mnt/cached/staging/data_backup/ai/ComfyUI/output/auto_background" ) os.makedirs(comfy_output_dir, exist_ok=True) existing_files = get_existing_files(comfy_output_dir) print(f"Found {len(existing_files)} existing files in output directory") print(f"Queuing prompt with seed {seed}...") response = queue_prompt(workflow, server_address) prompt_id = response["prompt_id"] print(f"Prompt ID: {prompt_id}") print(f"Expected filename prefix: {filename_prefix}") print("Waiting for generation (up to 4 minutes)...") new_file = wait_for_file_with_prefix( comfy_output_dir, f"gen_{unique_id}", timeout=240 ) if not new_file: raise RuntimeError("Timeout: No new image file appeared in 4 minutes") print(f"New image generated: {os.path.basename(new_file)}") time.sleep(2) image_basename = os.path.basename(image_path) room_name = ( image_basename.replace("pic_", "").replace("_visual.png", "").replace("_", "-") ) combo_outputs_dir = os.path.join(script_dir, "combo_outputs", room_name) os.makedirs(combo_outputs_dir, exist_ok=True) output_image_path = os.path.join( combo_outputs_dir, f"{room_name}_caption_{caption_num}_{seed}_generated.png" ) shutil.copy2(new_file, output_image_path) print(f"Saved image: {output_image_path}") return output_image_path def main(): import argparse parser = argparse.ArgumentParser( description="Generate combo image from kq4-decompile visual using ComfyUI" ) parser.add_argument( "image_path", help="Path to kq4-decompile visual image (e.g., kq4-sierra-decompile/rooms/kq4-057-witch-cave/pic_057_visual.png)", ) parser.add_argument( "caption_text", help="Direct caption text for the scene (not AI-generated)", ) parser.add_argument("seed", type=int, help="Random seed for generation") parser.add_argument( "--server", default="127.0.0.1:8188", help="ComfyUI server address (default: 127.0.0.1:8188)", ) parser.add_argument( "--dry-run", action="store_true", help="Test mode: validate image file and server connection without generating images", ) parser.add_argument( "--caption-num", type=int, default=1, help="Caption number for output filename (default: 1)", ) args = parser.parse_args() if not os.path.exists(args.image_path): print(f"Error: Image file not found: {args.image_path}") sys.exit(1) image_basename = os.path.basename(args.image_path) print(f"Image: {image_basename}") print( f"Caption: {args.caption_text[:50]}..." if len(args.caption_text) > 50 else f"Caption: {args.caption_text}" ) print(f"Seed: {args.seed}") print(f"Server: {args.server}") if args.dry_run: print("\n[Dry Run Mode - Checking server connection...]") if check_server(args.server): print("āœ“ ComfyUI server is running and accessible") print("\nāœ“ Dry run successful! All checks passed.") sys.exit(0) else: print(f"āœ— ComfyUI server is not accessible at {args.server}") print(" Please ensure ComfyUI is running before generating images.") sys.exit(1) print("\nChecking ComfyUI server...") if not check_server(args.server): print(f"Error: ComfyUI server is not running at {args.server}") print("Please start ComfyUI first or check the server address.") print(f"\nTo test without generating, use: --dry-run") sys.exit(1) print("āœ“ ComfyUI server is running") try: output_path = generate_from_caption_combo( args.image_path, args.caption_text, args.seed, args.server, caption_num=args.caption_num, ) print(f"\nGeneration complete! Output: {output_path}") except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()