from PIL import Image import folder_paths import os import torch import numpy as np VALID_DIRECTIONS = {"n", "ne", "e", "se", "s", "sw", "w", "nw"} VALID_MODALITIES = {"image", "depth", "openpose"} def _discover_directories(): base_dir = folder_paths.get_input_directory() if not os.path.exists(base_dir): return [] candidates = set() for root, subdirs, _ in os.walk(base_dir, followlinks=True): current_path = root[os.path.dirname(root) + 1:] sub_dirs_lower = {s.lower() for s in subdirs} if VALID_DIRECTIONS & sub_dirs_lower or VALID_MODALITIES & sub_dirs_lower: candidates.add(current_path) return sorted(candidates) class CompassImageLoader: CATEGORY = "image/loaders" @classmethod def INPUT_TYPES(cls): directories = _discover_directories() return { "required": { "directory": (directories if directories else ["(none found)"],), "direction": (["", "n", "ne", "e", "se", "s", "sw", "w", "nw"],), "modality": (["image", "depth", "openpose"],), "frame": ("STRING", {"default": ""}), "width": ("INT", {"default": 0, "min": 0, "max": 16384, "step": 1}), "height": ("INT", {"default": 0, "min": 0, "max": 16384, "step": 1}), }, } RETURN_TYPES = ("IMAGE", "STRING", "INT", "INT", "INT") RETURN_NAMES = ("IMAGE", "path", "width", "height", "frame_count") FUNCTION = "load_images" def load_images(self, directory, direction, modality, frame=None, width=0, height=0): base_dir = folder_paths.get_input_directory() if direction and direction.strip(): target_dir = os.path.join(base_dir, directory, direction, modality) else: target_dir = os.path.join(base_dir, directory, modality) if not os.path.isdir(target_dir): raise RuntimeError(f"Compass directory not found: {target_dir}") supported_extensions = {"png", "jpg", "jpeg", "webp", "bmp", "gif", "tiff"} files = [ f for f in sorted(os.listdir(target_dir)) if os.path.isfile(os.path.join(target_dir, f)) and f.split(".")[-1].lower() in supported_extensions ] if not files: raise RuntimeError(f"No images found in: {target_dir}") if frame is None or str(frame).strip() == "": selected_files = files output_path = target_dir else: try: index = int(str(frame).strip()) except (ValueError, TypeError): raise RuntimeError(f"Invalid frame number: '{frame}'. Must be an integer.") if index < 0 or index >= len(files): raise RuntimeError( f"Frame index {index} out of bounds. Found {len(files)} images in {target_dir}." ) selected_files = [files[index]] output_path = os.path.join(target_dir, files[index]) tensors = [] final_w, final_h = 0, 0 for filename in selected_files: filepath = os.path.join(target_dir, filename) image = Image.open(filepath).convert("RGB") orig_w, orig_h = image.size if width == 0 and height == 0: pass elif width > 0 and height == 0: fw = width fh = int(orig_h * (width / orig_w)) image = image.resize((fw, fh), Image.Resampling.LANCZOS) elif height > 0 and width == 0: fh = height fw = int(orig_w * (height / orig_h)) image = image.resize((fw, fh), Image.Resampling.LANCZOS) else: scale = max(width / orig_w, height / orig_h) new_w = int(orig_w * scale) new_h = int(orig_h * scale) image = image.resize((new_w, new_h), Image.Resampling.LANCZOS) left = (new_w - width) // 2 top = (new_h - height) // 2 right = left + width bottom = top + height image = image.crop((left, top, right, bottom)) final_w, final_h = image.size[0], image.size[1] np_image = np.array(image).astype(np.float32) / 255.0 tensor = torch.from_numpy(np_image)[None,] tensors.append(tensor) if len(tensors) == 1: image_batch = tensors[0] else: image_batch = torch.cat(tensors, dim=0) return (image_batch, output_path, final_w, final_h, len(selected_files)) @classmethod def IS_CHANGED(cls, directory, direction, modality, frame=None, width=0, height=0): base_dir = folder_paths.get_input_directory() if direction and direction.strip(): target_dir = os.path.join(base_dir, directory, direction, modality) else: target_dir = os.path.join(base_dir, directory, modality) import hashlib m = hashlib.sha256() m.update(f"{directory}:{direction}:{modality}:{frame}:{width}:{height}".encode("utf-8")) if not os.path.isdir(target_dir): return "" supported_extensions = {"png", "jpg", "jpeg", "webp", "bmp", "gif", "tiff"} files = [ f for f in sorted(os.listdir(target_dir)) if os.path.isfile(os.path.join(target_dir, f)) and f.split(".")[-1].lower() in supported_extensions ] if frame is None or str(frame).strip() == "": m.update(":".join(files).encode("utf-8")) else: try: index = int(str(frame).strip()) filepath = os.path.join(target_dir, files[index]) with open(filepath, "rb") as f: m.update(f.read(65536)) except (ValueError, IndexErro