228 lines
7.7 KiB
Python
228 lines
7.7 KiB
Python
from PIL import Image
|
|
import folder_paths
|
|
import os
|
|
import random
|
|
import torch
|
|
import numpy as np
|
|
|
|
|
|
VALID_DIRECTIONS = {"n", "ne", "e", "se", "s", "sw", "w", "nw"}
|
|
VALID_MODALITIES = {"image", "depth", "openpose"}
|
|
SUPPORTED_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif", ".tiff"}
|
|
|
|
|
|
def _discover_directories():
|
|
base_dir = folder_paths.get_input_directory()
|
|
if not os.path.exists(base_dir):
|
|
return []
|
|
|
|
candidates = set()
|
|
|
|
for root, dirs, _ in os.walk(base_dir, followlinks=True):
|
|
rel = os.path.relpath(root, base_dir)
|
|
if rel == ".":
|
|
continue
|
|
|
|
dirs_lower = {d.lower() for d in dirs}
|
|
|
|
if VALID_DIRECTIONS & dirs_lower or VALID_MODALITIES & dirs_lower:
|
|
candidates.add(rel)
|
|
|
|
return sorted(candidates)
|
|
|
|
|
|
def _resolve_target_dir(base_dir, directory, direction):
|
|
if not directory or not (isinstance(directory, str) and directory.strip()):
|
|
raise ValueError("directory must be a non-empty string")
|
|
|
|
path = os.path.join(base_dir, directory.strip())
|
|
|
|
if direction and direction.strip():
|
|
path = os.path.join(path, direction.strip())
|
|
|
|
return path
|
|
|
|
|
|
def _list_image_files(target_dir):
|
|
try:
|
|
files = [
|
|
f
|
|
for f in sorted(os.listdir(target_dir))
|
|
if os.path.isfile(os.path.join(target_dir, f))
|
|
and os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
|
|
]
|
|
except OSError:
|
|
return []
|
|
return files
|
|
|
|
|
|
def _resize_image(image, target_w, target_h):
|
|
orig_w, orig_h = image.size
|
|
|
|
if target_w == 0 and target_h == 0:
|
|
return image, orig_w, orig_h
|
|
|
|
if target_w > 0 and target_h == 0:
|
|
fh = max(1, int(orig_h * (target_w / orig_w)))
|
|
return image.resize((target_w, fh), Image.Resampling.LANCZOS), target_w, fh
|
|
|
|
if target_h > 0 and target_w == 0:
|
|
fw = max(1, int(orig_w * (target_h / orig_h)))
|
|
return image.resize((fw, target_h), Image.Resampling.LANCZOS), fw, target_h
|
|
|
|
scale = max(target_w / orig_w, target_h / orig_h)
|
|
new_w = max(1, int(orig_w * scale))
|
|
new_h = max(1, int(orig_h * scale))
|
|
|
|
resized = image.resize((new_w, new_h), Image.Resampling.LANCZOS)
|
|
left = (new_w - target_w) // 2
|
|
top = (new_h - target_h) // 2
|
|
return resized.crop((left, top, left + target_w, top + target_h)), target_w, target_h
|
|
|
|
|
|
class CompassImageLoader:
|
|
CATEGORY = "image/loaders"
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
directories = _discover_directories()
|
|
return {
|
|
"required": {
|
|
"directory": (directories if directories else ["(none found)"],),
|
|
"direction": (
|
|
["", "n", "ne", "e", "se", "s", "sw", "w", "nw"],
|
|
{"default": ""},
|
|
),
|
|
"modality": (["image", "depth", "openpose"],),
|
|
"frame": ("STRING", {"default": ""}),
|
|
"width": ("INT", {"default": 0, "min": 0, "max": 16384, "step": 1}),
|
|
"height": ("INT", {"default": 0, "min": 0, "max": 16384, "step": 1}),
|
|
},
|
|
"optional": {
|
|
"direction_override": ("STRING", {"default": ""}),
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE", "STRING", "STRING", "INT", "INT", "INT")
|
|
RETURN_NAMES = ("IMAGE", "path", "direction", "width", "height", "frame_count")
|
|
FUNCTION = "load_images"
|
|
OUTPUT_NODE = True
|
|
|
|
def load_images(
|
|
self, directory, direction, modality, frame=None, width=0, height=0,
|
|
direction_override=None
|
|
):
|
|
if direction_override is not None and str(direction_override).strip():
|
|
resolved_direction = str(direction_override).strip()
|
|
elif direction and direction.strip():
|
|
resolved_direction = str(direction).strip()
|
|
else:
|
|
resolved_direction = ""
|
|
|
|
base_dir = folder_paths.get_input_directory()
|
|
target_dir = _resolve_target_dir(base_dir, directory, resolved_direction)
|
|
modality_path = os.path.join(target_dir, modality)
|
|
|
|
if not os.path.isdir(modality_path):
|
|
raise RuntimeError(f"Compass directory not found: {modality_path}")
|
|
|
|
files = _list_image_files(modality_path)
|
|
if not files:
|
|
raise RuntimeError(f"No images found in: {modality_path}")
|
|
|
|
if frame is None or str(frame).strip() == "":
|
|
selected_files = files
|
|
output_path = modality_path
|
|
else:
|
|
try:
|
|
index = int(str(frame).strip())
|
|
except (ValueError, TypeError):
|
|
raise RuntimeError(
|
|
f"Invalid frame number: '{frame}'. Must be an integer."
|
|
)
|
|
|
|
if index < 0 or index >= len(files):
|
|
raise RuntimeError(
|
|
f"Frame index {index} out of bounds. "
|
|
f"Found {len(files)} images in {modality_path}."
|
|
)
|
|
|
|
selected_files = [files[index]]
|
|
output_path = os.path.join(modality_path, files[index])
|
|
|
|
tensors = []
|
|
final_w, final_h = 0, 0
|
|
|
|
for filename in selected_files:
|
|
filepath = os.path.join(modality_path, filename)
|
|
image = Image.open(filepath).convert("RGB")
|
|
image, final_w, final_h = _resize_image(image, width, height)
|
|
|
|
np_arr = np.array(image).astype(np.float32) / 255.0
|
|
tensors.append(torch.from_numpy(np_arr)[None,])
|
|
|
|
image_batch = (
|
|
tensors[0] if len(tensors) == 1 else torch.cat(tensors, dim=0)
|
|
)
|
|
|
|
temp_dir = folder_paths.get_temp_directory()
|
|
prefix = "_compass_" + "".join(random.choice("abcdefghijklmnopqrstupvxyz") for _ in range(5))
|
|
results = []
|
|
for batch_number, img_tensor in enumerate(image_batch):
|
|
i = 255.0 * img_tensor.cpu().numpy()
|
|
pil_img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
|
|
file = f"{prefix}_{batch_number:05}_.png"
|
|
pil_img.save(os.path.join(temp_dir, file), compress_level=1)
|
|
results.append({"filename": file, "subfolder": "", "type": "temp"})
|
|
|
|
return {
|
|
"ui": {"images": results},
|
|
"result": (image_batch, output_path, resolved_direction, final_w, final_h, len(selected_files)),
|
|
}
|
|
|
|
@classmethod
|
|
def IS_CHANGED(
|
|
cls, directory, direction, modality, frame=None, width=0, height=0,
|
|
direction_override=None
|
|
):
|
|
import hashlib
|
|
|
|
if direction_override is not None and str(direction_override).strip():
|
|
resolved_direction = str(direction_override).strip()
|
|
elif direction and direction.strip():
|
|
resolved_direction = str(direction).strip()
|
|
else:
|
|
resolved_direction = ""
|
|
|
|
base_dir = folder_paths.get_input_directory()
|
|
target_dir = _resolve_target_dir(base_dir, directory, resolved_direction)
|
|
modality_path = os.path.join(target_dir, modality)
|
|
|
|
if not os.path.isdir(modality_path):
|
|
return ""
|
|
|
|
files = _list_image_files(modality_path)
|
|
m = hashlib.sha256()
|
|
m.update(
|
|
f"{directory}|{resolved_direction}|{modality}|{frame}|{width}|{height}".encode()
|
|
)
|
|
|
|
if frame is None or str(frame).strip() == "":
|
|
for f in files:
|
|
fp = os.path.join(modality_path, f)
|
|
try:
|
|
st = os.stat(fp)
|
|
m.update(f"{f}:{st.st_mtime}:{st.st_size}".encode())
|
|
except OSError:
|
|
pass
|
|
else:
|
|
try:
|
|
index = int(str(frame).strip())
|
|
fp = os.path.join(modality_path, files[index])
|
|
with open(fp, "rb") as fh:
|
|
m.update(fh.read(65536))
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
return m.hexdigest()
|