224 lines
7.6 KiB
Python
Executable File
224 lines
7.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Generate an image from a caption file using ComfyUI workflow."""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
|
|
def check_server(server_address: str = "127.0.0.1:8188", timeout: int = 5) -> bool:
|
|
"""Check if ComfyUI server is running and accessible."""
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"http://{server_address}/system_stats",
|
|
method="GET",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
|
return response.status == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def queue_prompt(prompt: dict, server_address: str = "127.0.0.1:8188") -> dict:
|
|
"""Queue a prompt to ComfyUI server."""
|
|
client_id = str(uuid.uuid4())
|
|
p = {"prompt": prompt, "client_id": client_id}
|
|
data = json.dumps(p).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"http://{server_address}/prompt",
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req) as response:
|
|
return json.loads(response.read())
|
|
|
|
|
|
def get_existing_files(directory: str) -> set:
|
|
"""Get set of existing filenames in directory."""
|
|
if not os.path.exists(directory):
|
|
return set()
|
|
return set(f for f in os.listdir(directory) if f.endswith('.png'))
|
|
|
|
|
|
def wait_for_new_file(directory: str, existing_files: set, timeout: int = 240) -> str | None:
|
|
"""Wait for a new PNG file to appear in directory."""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
if os.path.exists(directory):
|
|
current_files = set(f for f in os.listdir(directory) if f.endswith('.png'))
|
|
new_files = current_files - existing_files
|
|
|
|
if new_files:
|
|
# Return the newest file (by modification time)
|
|
newest_file = max(
|
|
new_files,
|
|
key=lambda f: os.path.getmtime(os.path.join(directory, f))
|
|
)
|
|
return os.path.join(directory, newest_file)
|
|
|
|
time.sleep(0.5)
|
|
|
|
return None
|
|
|
|
|
|
def wait_for_file_with_prefix(directory: str, prefix: str, timeout: int = 240) -> str | None:
|
|
"""Wait for a PNG file with specific prefix to appear in directory."""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
if os.path.exists(directory):
|
|
for filename in os.listdir(directory):
|
|
if filename.startswith(prefix) and filename.endswith('.png'):
|
|
return os.path.join(directory, filename)
|
|
|
|
time.sleep(0.5)
|
|
|
|
return None
|
|
|
|
|
|
def generate_from_caption(
|
|
caption_file: str, seed: int, server_address: str = "127.0.0.1:8188"
|
|
) -> str:
|
|
"""Generate image from caption file with given seed.
|
|
|
|
Args:
|
|
caption_file: Path to the caption text file
|
|
seed: Random seed for generation
|
|
server_address: ComfyUI server address
|
|
|
|
Returns:
|
|
Path to the created output directory
|
|
"""
|
|
# Read caption
|
|
with open(caption_file, "r") as f:
|
|
caption = f.read().strip()
|
|
|
|
# Load workflow
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
workflow_path = os.path.join(script_dir, "generate_caption.json")
|
|
with open(workflow_path, "r") as f:
|
|
workflow = json.load(f)
|
|
|
|
# Update caption in node 27
|
|
workflow["27"]["inputs"]["text"] = caption
|
|
|
|
# Update seeds in nodes 20 and 21
|
|
workflow["20"]["inputs"]["seed"] = seed
|
|
workflow["21"]["inputs"]["seed"] = seed + 1
|
|
|
|
# Generate unique filename prefix to ensure we copy the right file
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
filename_prefix = f"auto_background/gen_{unique_id}"
|
|
workflow["26"]["inputs"]["filename_prefix"] = filename_prefix
|
|
|
|
# Get existing files before generation
|
|
auto_bg_dir = os.path.join(script_dir, "auto_background")
|
|
existing_files = get_existing_files(auto_bg_dir)
|
|
print(f"Found {len(existing_files)} existing files in output directory")
|
|
|
|
# Queue the prompt
|
|
print(f"Queuing prompt with seed {seed}...")
|
|
response = queue_prompt(workflow, server_address)
|
|
prompt_id = response["prompt_id"]
|
|
print(f"Prompt ID: {prompt_id}")
|
|
print(f"Expected filename prefix: gen_{unique_id}")
|
|
|
|
# Wait for the specific file with our unique prefix
|
|
print("Waiting for generation (up to 2 minutes)...")
|
|
new_file = wait_for_file_with_prefix(auto_bg_dir, f"gen_{unique_id}", timeout=240)
|
|
|
|
if not new_file:
|
|
raise RuntimeError("Timeout: No new image file appeared in 2 minutes")
|
|
|
|
print(f"New image generated: {os.path.basename(new_file)}")
|
|
|
|
# Wait a moment for the file to be fully written
|
|
time.sleep(2)
|
|
|
|
# Create output directory next to caption file
|
|
caption_dir = os.path.dirname(os.path.abspath(caption_file))
|
|
caption_name = os.path.splitext(os.path.basename(caption_file))[0]
|
|
output_image_path = os.path.join(caption_dir, f"{caption_name}_{seed}_generated.png")
|
|
|
|
# Copy the generated image to output directory with a cleaner name
|
|
output_image_path = os.path.join(caption_dir, output_image_path)
|
|
shutil.copy2(new_file, output_image_path)
|
|
print(f"Saved image: {output_image_path}")
|
|
|
|
|
|
return output_image_path
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Generate image from caption using ComfyUI")
|
|
parser.add_argument("caption_file", help="Path to caption text file")
|
|
parser.add_argument("seed", type=int, help="Random seed for generation")
|
|
parser.add_argument(
|
|
"--server",
|
|
default="127.0.0.1:8188",
|
|
help="ComfyUI server address (default: 127.0.0.1:8188)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Test mode: validate caption file and server connection without generating images",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.exists(args.caption_file):
|
|
print(f"Error: Caption file not found: {args.caption_file}")
|
|
sys.exit(1)
|
|
|
|
# Read and validate caption
|
|
with open(args.caption_file, "r") as f:
|
|
caption = f.read().strip()
|
|
|
|
if not caption:
|
|
print(f"Error: Caption file is empty: {args.caption_file}")
|
|
sys.exit(1)
|
|
|
|
print(f"Caption: {caption[:100]}{'...' if len(caption) > 100 else ''}")
|
|
print(f"Seed: {args.seed}")
|
|
print(f"Server: {args.server}")
|
|
|
|
if args.dry_run:
|
|
print("\n[Dry Run Mode - Checking server connection...]")
|
|
if check_server(args.server):
|
|
print("✓ ComfyUI server is running and accessible")
|
|
print("\n✓ Dry run successful! All checks passed.")
|
|
sys.exit(0)
|
|
else:
|
|
print(f"✗ ComfyUI server is not accessible at {args.server}")
|
|
print(" Please ensure ComfyUI is running before generating images.")
|
|
sys.exit(1)
|
|
|
|
# Check server before attempting generation
|
|
print("\nChecking ComfyUI server...")
|
|
if not check_server(args.server):
|
|
print(f"Error: ComfyUI server is not running at {args.server}")
|
|
print("Please start ComfyUI first or check the server address.")
|
|
print(f"\nTo test without generating, use: --dry-run")
|
|
sys.exit(1)
|
|
|
|
print("✓ ComfyUI server is running")
|
|
|
|
try:
|
|
output_dir = generate_from_caption(args.caption_file, args.seed, args.server)
|
|
print(f"\nGeneration complete! Output directory: {output_dir}")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|