275 lines
9.1 KiB
Python
275 lines
9.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Generate an image from a kq4-decompile visual image using ComfyUI workflow."""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
|
|
def check_server(server_address: str = "127.0.0.1:8188", timeout: int = 5) -> bool:
|
|
"""Check if ComfyUI server is running and accessible."""
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"http://{server_address}/system_stats",
|
|
method="GET",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
|
return response.status == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def queue_prompt(prompt: dict, server_address: str = "127.0.0.1:8188") -> dict:
|
|
"""Queue a prompt to ComfyUI server."""
|
|
client_id = str(uuid.uuid4())
|
|
p = {"prompt": prompt, "client_id": client_id}
|
|
data = json.dumps(p).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"http://{server_address}/prompt",
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req) as response:
|
|
return json.loads(response.read())
|
|
|
|
|
|
def get_existing_files(directory: str) -> set:
|
|
"""Get set of existing filenames in directory."""
|
|
if not os.path.exists(directory):
|
|
return set()
|
|
return set(f for f in os.listdir(directory) if f.endswith(".png"))
|
|
|
|
|
|
def wait_for_new_file(
|
|
directory: str, existing_files: set, timeout: int = 240
|
|
) -> str | None:
|
|
"""Wait for a new PNG file to appear in directory."""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
if os.path.exists(directory):
|
|
current_files = set(f for f in os.listdir(directory) if f.endswith(".png"))
|
|
new_files = current_files - existing_files
|
|
|
|
if new_files:
|
|
newest_file = max(
|
|
new_files,
|
|
key=lambda f: os.path.getmtime(os.path.join(directory, f)),
|
|
)
|
|
return os.path.join(directory, newest_file)
|
|
|
|
time.sleep(0.5)
|
|
|
|
return None
|
|
|
|
|
|
def wait_for_file_with_prefix(
|
|
directory: str, prefix: str, timeout: int = 240
|
|
) -> str | None:
|
|
"""Wait for a PNG file with specific prefix to appear in directory."""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
if os.path.exists(directory):
|
|
for filename in os.listdir(directory):
|
|
if filename.startswith(prefix) and filename.endswith(".png"):
|
|
return os.path.join(directory, filename)
|
|
|
|
time.sleep(0.5)
|
|
|
|
return None
|
|
|
|
|
|
DEFAULT_CAPTION_PROMPT = 'describe the mood and atmosphere of the scene without describing colors or style. Focus on the framing, composition, lighting mood, and elements visible. Respond with just the result, preceded by "convert to a painting in the Disney Renaissance Style and kq5hoyos style and sylvain style. ". Keep it to around 20-25 words. If the image is framed in a particular way, make sure to call that out. never describe colors.'
|
|
|
|
|
|
def generate_from_caption_klein(
|
|
image_path: str,
|
|
seed: int,
|
|
server_address: str = "127.0.0.1:8188",
|
|
lora_disney: float = 0.8,
|
|
lora_kq5hoyos: float = 0.7,
|
|
lora_sylvain: float = 0.5,
|
|
caption_prompt: str = DEFAULT_CAPTION_PROMPT,
|
|
) -> str:
|
|
"""Generate image from kq4-decompile visual image with given seed.
|
|
|
|
Args:
|
|
image_path: Path to kq4-decompile visual image
|
|
seed: Random seed for generation
|
|
server_address: ComfyUI server address
|
|
lora_disney: Strength for Disney LoRA (default: 0.8)
|
|
lora_kq5hoyos: Strength for KQ5Hoyos LoRA (default: 0.7)
|
|
lora_sylvain: Strength for Sylvain LoRA (default: 0.5)
|
|
caption_prompt: Custom caption prompt for scene description
|
|
|
|
Returns:
|
|
Path to the generated output image
|
|
"""
|
|
if not os.path.exists(image_path):
|
|
raise FileNotFoundError(f"Image file not found: {image_path}")
|
|
|
|
image_path = os.path.abspath(image_path)
|
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
workflow_path = os.path.join(script_dir, "..", "tools", "generate_image_klein.json")
|
|
workflow_path = os.path.normpath(workflow_path)
|
|
with open(workflow_path, "r") as f:
|
|
workflow = json.load(f)
|
|
|
|
workflow["157"]["inputs"]["image"] = image_path
|
|
|
|
workflow["164:162"]["inputs"]["seed"] = seed
|
|
workflow["75:73"]["inputs"]["noise_seed"] = seed
|
|
|
|
workflow["164:163"]["inputs"]["prompt"] = caption_prompt
|
|
|
|
workflow["75:154"]["inputs"]["lora_1"]["strength"] = lora_disney
|
|
workflow["75:154"]["inputs"]["lora_2"]["strength"] = lora_kq5hoyos
|
|
workflow["75:154"]["inputs"]["lora_3"]["strength"] = lora_sylvain
|
|
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
filename_prefix = f"kq4tohq/gen_{unique_id}"
|
|
workflow["156"]["inputs"]["filename_prefix"] = filename_prefix
|
|
|
|
comfy_output_dir = "/mnt/cached/staging/data_backup/ai/ComfyUI/output/kq4tohq"
|
|
os.makedirs(comfy_output_dir, exist_ok=True)
|
|
existing_files = get_existing_files(comfy_output_dir)
|
|
print(f"Found {len(existing_files)} existing files in output directory")
|
|
|
|
print(f"Queuing prompt with seed {seed}...")
|
|
response = queue_prompt(workflow, server_address)
|
|
prompt_id = response["prompt_id"]
|
|
print(f"Prompt ID: {prompt_id}")
|
|
print(f"Expected filename prefix: {filename_prefix}")
|
|
|
|
print("Waiting for generation (up to 4 minutes)...")
|
|
new_file = wait_for_file_with_prefix(
|
|
comfy_output_dir, f"gen_{unique_id}", timeout=240
|
|
)
|
|
|
|
if not new_file:
|
|
raise RuntimeError("Timeout: No new image file appeared in 4 minutes")
|
|
|
|
print(f"New image generated: {os.path.basename(new_file)}")
|
|
|
|
time.sleep(2)
|
|
|
|
image_dir = os.path.dirname(os.path.abspath(image_path))
|
|
image_basename = os.path.basename(image_path)
|
|
room_name = (
|
|
image_basename.replace("pic_", "").replace("_visual.png", "").replace("_", "-")
|
|
)
|
|
|
|
klein_outputs_dir = os.path.join(script_dir, "klein_outputs", room_name)
|
|
os.makedirs(klein_outputs_dir, exist_ok=True)
|
|
|
|
output_image_path = os.path.join(
|
|
klein_outputs_dir, f"{room_name}_{seed}_generated.png"
|
|
)
|
|
shutil.copy2(new_file, output_image_path)
|
|
print(f"Saved image: {output_image_path}")
|
|
|
|
return output_image_path
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate image from kq4-decompile visual using ComfyUI"
|
|
)
|
|
parser.add_argument(
|
|
"image_path",
|
|
help="Path to kq4-decompile visual image (e.g., kq4-sierra-decompile/rooms/kq4-057-witch-cave/pic_057_visual.png)",
|
|
)
|
|
parser.add_argument("seed", type=int, help="Random seed for generation")
|
|
parser.add_argument(
|
|
"--server",
|
|
default="127.0.0.1:8188",
|
|
help="ComfyUI server address (default: 127.0.0.1:8188)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Test mode: validate image file and server connection without generating images",
|
|
)
|
|
parser.add_argument(
|
|
"--lora-disney",
|
|
type=float,
|
|
default=0.8,
|
|
help="Disney LoRA strength (default: 0.8)",
|
|
)
|
|
parser.add_argument(
|
|
"--lora-kq5hoyos",
|
|
type=float,
|
|
default=0.7,
|
|
help="KQ5Hoyos LoRA strength (default: 0.7)",
|
|
)
|
|
parser.add_argument(
|
|
"--lora-sylvain",
|
|
type=float,
|
|
default=0.5,
|
|
help="Sylvain LoRA strength (default: 0.5)",
|
|
)
|
|
parser.add_argument(
|
|
"--caption-prompt",
|
|
default=DEFAULT_CAPTION_PROMPT,
|
|
help="Custom caption prompt for scene description",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.exists(args.image_path):
|
|
print(f"Error: Image file not found: {args.image_path}")
|
|
sys.exit(1)
|
|
|
|
image_basename = os.path.basename(args.image_path)
|
|
print(f"Image: {image_basename}")
|
|
print(f"Seed: {args.seed}")
|
|
print(f"Server: {args.server}")
|
|
|
|
if args.dry_run:
|
|
print("\n[Dry Run Mode - Checking server connection...]")
|
|
if check_server(args.server):
|
|
print("✓ ComfyUI server is running and accessible")
|
|
print("\n✓ Dry run successful! All checks passed.")
|
|
sys.exit(0)
|
|
else:
|
|
print(f"✗ ComfyUI server is not accessible at {args.server}")
|
|
print(" Please ensure ComfyUI is running before generating images.")
|
|
sys.exit(1)
|
|
|
|
print("\nChecking ComfyUI server...")
|
|
if not check_server(args.server):
|
|
print(f"Error: ComfyUI server is not running at {args.server}")
|
|
print("Please start ComfyUI first or check the server address.")
|
|
print(f"\nTo test without generating, use: --dry-run")
|
|
sys.exit(1)
|
|
|
|
print("✓ ComfyUI server is running")
|
|
|
|
try:
|
|
output_path = generate_from_caption_klein(
|
|
args.image_path,
|
|
args.seed,
|
|
args.server,
|
|
lora_disney=args.lora_disney,
|
|
lora_kq5hoyos=args.lora_kq5hoyos,
|
|
lora_sylvain=args.lora_sylvain,
|
|
caption_prompt=args.caption_prompt,
|
|
)
|
|
print(f"\nGeneration complete! Output: {output_path}")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|