230 lines
7.2 KiB
Python
Executable File
230 lines
7.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Generate a combo image from a kq4-decompile visual image using ComfyUI workflow."""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
|
|
def check_server(server_address: str = "127.0.0.1:8188", timeout: int = 5) -> bool:
|
|
"""Check if ComfyUI server is running and accessible."""
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"http://{server_address}/system_stats",
|
|
method="GET",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
|
return response.status == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def queue_prompt(prompt: dict, server_address: str = "127.0.0.1:8188") -> dict:
|
|
"""Queue a prompt to ComfyUI server."""
|
|
client_id = str(uuid.uuid4())
|
|
p = {"prompt": prompt, "client_id": client_id}
|
|
data = json.dumps(p).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"http://{server_address}/prompt",
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req) as response:
|
|
return json.loads(response.read())
|
|
|
|
|
|
def get_existing_files(directory: str) -> set:
|
|
"""Get set of existing filenames in directory."""
|
|
if not os.path.exists(directory):
|
|
return set()
|
|
return set(f for f in os.listdir(directory) if f.endswith(".png"))
|
|
|
|
|
|
def wait_for_file_with_prefix(
|
|
directory: str, prefix: str, timeout: int = 240
|
|
) -> str | None:
|
|
"""Wait for a PNG file with specific prefix to appear in directory."""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
if os.path.exists(directory):
|
|
for filename in os.listdir(directory):
|
|
if filename.startswith(prefix) and filename.endswith(".png"):
|
|
return os.path.join(directory, filename)
|
|
|
|
time.sleep(0.5)
|
|
|
|
return None
|
|
|
|
|
|
def generate_from_caption_combo(
|
|
image_path: str,
|
|
caption_text: str,
|
|
seed: int,
|
|
server_address: str = "127.0.0.1:8188",
|
|
caption_num: int = 1,
|
|
) -> str:
|
|
"""Generate combo image from kq4-decompile visual image with given seed.
|
|
|
|
Args:
|
|
image_path: Path to kq4-decompile visual image
|
|
caption_text: Direct caption text for the scene (not AI-generated)
|
|
seed: Random seed for generation
|
|
server_address: ComfyUI server address
|
|
|
|
Returns:
|
|
Path to the generated output image
|
|
"""
|
|
if not os.path.exists(image_path):
|
|
raise FileNotFoundError(f"Image file not found: {image_path}")
|
|
|
|
image_path = os.path.abspath(image_path)
|
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
workflow_path = os.path.join(script_dir, "..", "tools", "klein-qwen-combo.json")
|
|
workflow_path = os.path.normpath(workflow_path)
|
|
with open(workflow_path, "r") as f:
|
|
workflow = json.load(f)
|
|
|
|
workflow["157"]["inputs"]["image"] = image_path
|
|
|
|
workflow["167"]["inputs"]["text"] = caption_text
|
|
|
|
workflow["196"]["inputs"]["seed"] = seed
|
|
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
filename_prefix = f"auto_background/gen_{unique_id}"
|
|
workflow["183"]["inputs"]["filename_prefix"] = filename_prefix
|
|
|
|
comfy_output_dir = (
|
|
"/mnt/cached/staging/data_backup/ai/ComfyUI/output/auto_background"
|
|
)
|
|
os.makedirs(comfy_output_dir, exist_ok=True)
|
|
existing_files = get_existing_files(comfy_output_dir)
|
|
print(f"Found {len(existing_files)} existing files in output directory")
|
|
|
|
print(f"Queuing prompt with seed {seed}...")
|
|
response = queue_prompt(workflow, server_address)
|
|
prompt_id = response["prompt_id"]
|
|
print(f"Prompt ID: {prompt_id}")
|
|
print(f"Expected filename prefix: {filename_prefix}")
|
|
|
|
print("Waiting for generation (up to 4 minutes)...")
|
|
new_file = wait_for_file_with_prefix(
|
|
comfy_output_dir, f"gen_{unique_id}", timeout=240
|
|
)
|
|
|
|
if not new_file:
|
|
raise RuntimeError("Timeout: No new image file appeared in 4 minutes")
|
|
|
|
print(f"New image generated: {os.path.basename(new_file)}")
|
|
|
|
time.sleep(2)
|
|
|
|
image_basename = os.path.basename(image_path)
|
|
room_name = (
|
|
image_basename.replace("pic_", "").replace("_visual.png", "").replace("_", "-")
|
|
)
|
|
|
|
combo_outputs_dir = os.path.join(script_dir, "combo_outputs", room_name)
|
|
os.makedirs(combo_outputs_dir, exist_ok=True)
|
|
|
|
output_image_path = os.path.join(
|
|
combo_outputs_dir, f"{room_name}_caption_{caption_num}_{seed}_generated.png"
|
|
)
|
|
shutil.copy2(new_file, output_image_path)
|
|
print(f"Saved image: {output_image_path}")
|
|
|
|
return output_image_path
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate combo image from kq4-decompile visual using ComfyUI"
|
|
)
|
|
parser.add_argument(
|
|
"image_path",
|
|
help="Path to kq4-decompile visual image (e.g., kq4-sierra-decompile/rooms/kq4-057-witch-cave/pic_057_visual.png)",
|
|
)
|
|
parser.add_argument(
|
|
"caption_text",
|
|
help="Direct caption text for the scene (not AI-generated)",
|
|
)
|
|
parser.add_argument("seed", type=int, help="Random seed for generation")
|
|
parser.add_argument(
|
|
"--server",
|
|
default="127.0.0.1:8188",
|
|
help="ComfyUI server address (default: 127.0.0.1:8188)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Test mode: validate image file and server connection without generating images",
|
|
)
|
|
parser.add_argument(
|
|
"--caption-num",
|
|
type=int,
|
|
default=1,
|
|
help="Caption number for output filename (default: 1)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.exists(args.image_path):
|
|
print(f"Error: Image file not found: {args.image_path}")
|
|
sys.exit(1)
|
|
|
|
image_basename = os.path.basename(args.image_path)
|
|
print(f"Image: {image_basename}")
|
|
print(
|
|
f"Caption: {args.caption_text[:50]}..."
|
|
if len(args.caption_text) > 50
|
|
else f"Caption: {args.caption_text}"
|
|
)
|
|
print(f"Seed: {args.seed}")
|
|
print(f"Server: {args.server}")
|
|
|
|
if args.dry_run:
|
|
print("\n[Dry Run Mode - Checking server connection...]")
|
|
if check_server(args.server):
|
|
print("✓ ComfyUI server is running and accessible")
|
|
print("\n✓ Dry run successful! All checks passed.")
|
|
sys.exit(0)
|
|
else:
|
|
print(f"✗ ComfyUI server is not accessible at {args.server}")
|
|
print(" Please ensure ComfyUI is running before generating images.")
|
|
sys.exit(1)
|
|
|
|
print("\nChecking ComfyUI server...")
|
|
if not check_server(args.server):
|
|
print(f"Error: ComfyUI server is not running at {args.server}")
|
|
print("Please start ComfyUI first or check the server address.")
|
|
print(f"\nTo test without generating, use: --dry-run")
|
|
sys.exit(1)
|
|
|
|
print("✓ ComfyUI server is running")
|
|
|
|
try:
|
|
output_path = generate_from_caption_combo(
|
|
args.image_path,
|
|
args.caption_text,
|
|
args.seed,
|
|
args.server,
|
|
caption_num=args.caption_num,
|
|
)
|
|
print(f"\nGeneration complete! Output: {output_path}")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|