Restructure ORA editor into modular blueprints with browse dialog

- Split app.py into route blueprints (files, layers, images, polygon, mask, krita)
- Create services layer (polygon_storage, comfyui, file_browser)
- Extract config constants to config.py
- Split templates into Jinja partials (base, components, modals)
- Add browse dialog for visual file navigation
- Add /api/browse endpoint for directory listing
This commit is contained in:
2026-03-27 21:29:27 -07:00
parent 17da8c475e
commit fb812e57bc
21 changed files with 2269 additions and 1794 deletions

View File

@@ -0,0 +1,36 @@
"""Polygon storage service for in-memory polygon points."""
from typing import Any
class PolygonStorage:
"""In-memory storage for polygon points per ORA file."""
def __init__(self):
self._storage: dict[str, dict[str, Any]] = {}
def store(self, ora_path: str, points: list, color: str = '#FF0000', width: int = 2) -> None:
"""Store polygon data for an ORA file."""
self._storage[ora_path] = {
'points': points,
'color': color,
'width': width
}
def get(self, ora_path: str) -> dict[str, Any] | None:
"""Get polygon data for an ORA file."""
return self._storage.get(ora_path)
def clear(self, ora_path: str) -> bool:
"""Clear polygon data for an ORA file. Returns True if existed."""
if ora_path in self._storage:
del self._storage[ora_path]
return True
return False
def has_polygon(self, ora_path: str) -> bool:
"""Check if polygon exists for an ORA file."""
return ora_path in self._storage
polygon_storage = PolygonStorage()

View File

@@ -0,0 +1,187 @@
"""ComfyUI integration service for mask extraction."""
import base64
import io
import json
import random
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
import logging
from PIL import Image, ImageDraw
logger = logging.getLogger(__name__)
class ComfyUIService:
"""Service for interacting with ComfyUI API."""
def __init__(self, base_url: str):
self.base_url = base_url
self._webhook_response: dict | None = None
self._webhook_ready = threading.Event()
def submit_workflow(self, workflow: dict, comfy_url: str | None = None) -> str:
"""Submit a workflow to ComfyUI and return the prompt_id."""
url = comfy_url or self.base_url
headers = {'Content-Type': 'application/json'}
payload = json.dumps({"prompt": workflow}).encode('utf-8')
req = urllib.request.Request(
f'http://{url}/prompt',
data=payload,
headers=headers,
method='POST'
)
with urllib.request.urlopen(req, timeout=30) as response:
result = json.loads(response.read().decode())
prompt_id = result.get('prompt_id')
if not prompt_id:
raise RuntimeError("No prompt_id returned from ComfyUI")
return prompt_id
def poll_for_completion(self, prompt_id: str, comfy_url: str | None = None, timeout: int = 240) -> bool:
"""Poll ComfyUI history for workflow completion."""
url = comfy_url or self.base_url
headers = {'Content-Type': 'application/json'}
start_time = time.time()
while time.time() - start_time < timeout:
try:
req = urllib.request.Request(
f'http://{url}/history/{prompt_id}',
headers=headers,
method='GET'
)
with urllib.request.urlopen(req, timeout=30) as response:
history = json.loads(response.read().decode())
if prompt_id in history:
status = history[prompt_id].get('status', {})
if status.get('status_str') == 'success':
return True
time.sleep(2)
except urllib.error.HTTPError as e:
if e.code == 404:
time.sleep(2)
else:
raise
except Exception as e:
logger.error(f"Error polling history: {e}")
time.sleep(2)
return False
def wait_for_webhook(self, timeout: float = 60.0) -> dict | None:
"""Wait for webhook callback from ComfyUI."""
if self._webhook_ready.is_set() and self._webhook_response is not None:
return self._webhook_response
self._webhook_ready.clear()
self._webhook_response = None
webhook_received = self._webhook_ready.wait(timeout=timeout)
if webhook_received:
return self._webhook_response
return None
def handle_webhook(self, request_files, request_form, request_data, temp_dir: Path) -> dict:
"""Handle incoming webhook from ComfyUI."""
self._webhook_response = None
self._webhook_ready.clear()
try:
img_file = None
if 'file' in request_files:
img_file = request_files['file']
elif 'image' in request_files:
img_file = request_files['image']
elif request_files:
img_file = list(request_files.values())[0]
if img_file:
timestamp = str(int(time.time()))
final_mask_path = temp_dir / f"mask_{timestamp}.png"
img = Image.open(img_file).convert('RGBA')
img.save(str(final_mask_path), format='PNG')
logger.info(f"[WEBHOOK] Image saved to {final_mask_path}")
self._webhook_response = {'success': True, 'path': final_mask_path}
elif request_data:
timestamp = str(int(time.time()))
final_mask_path = temp_dir / f"mask_{timestamp}.png"
with open(final_mask_path, 'wb') as f:
f.write(request_data)
logger.info(f"[WEBHOOK] Raw data saved to {final_mask_path}")
self._webhook_response = {'success': True, 'path': final_mask_path}
else:
logger.error("[WEBHOOK] No image data in request")
self._webhook_response = {'success': False, 'error': 'No image data received'}
self._webhook_ready.set()
return self._webhook_response
except Exception as e:
logger.error(f"[WEBHOOK] Error: {e}")
self._webhook_response = {'success': False, 'error': str(e)}
self._webhook_ready.set()
return self._webhook_response
def prepare_mask_workflow(
self,
base_image: Image.Image,
subject: str,
webhook_url: str,
polygon_points: list | None = None,
polygon_color: str = '#FF0000',
polygon_width: int = 2,
workflow_template: dict | None = None
) -> dict:
"""Prepare the mask extraction workflow."""
workflow = workflow_template.copy() if workflow_template else {}
img = base_image.copy()
if polygon_points and len(polygon_points) >= 3:
w, h = img.size
pixel_points = [(int(p['x'] * w), int(p['y'] * h)) for p in polygon_points]
draw = ImageDraw.Draw(img)
hex_color = polygon_color if len(polygon_color) == 7 else polygon_color + 'FF'
draw.polygon(pixel_points, outline=hex_color, width=polygon_width)
img_io = io.BytesIO()
img.save(img_io, format='PNG')
img_io.seek(0)
base64_image = base64.b64encode(img_io.read()).decode('utf-8')
if "87" in workflow:
workflow["87"]["inputs"]["image"] = base64_image
if "1:68" in workflow and 'inputs' in workflow["1:68"]:
workflow["1:68"]["inputs"]["prompt"] = f"Create a black and white alpha mask of {subject}, leaving everything else black"
if "96" in workflow and 'inputs' in workflow["96"]:
workflow["96"]["inputs"]["webhook_url"] = webhook_url
if "50" in workflow and 'inputs' in workflow["50"]:
workflow["50"]["inputs"]["seed"] = random.randint(0, 2**31-1)
return workflow

View File

@@ -0,0 +1,75 @@
"""File browsing service for project directory navigation."""
import os
from pathlib import Path
from typing import Any
class FileBrowserService:
"""Service for browsing project files and directories."""
SUPPORTED_EXTENSIONS = {'.png', '.ora'}
def __init__(self, project_root: Path):
self.project_root = project_root
def list_directory(self, relative_path: str = "") -> dict[str, Any]:
"""List contents of a directory relative to project root."""
if relative_path:
dir_path = self.project_root / relative_path
else:
dir_path = self.project_root
if not dir_path.exists() or not dir_path.is_dir():
return {
'success': False,
'error': f'Directory not found: {relative_path}'
}
directories = []
files = []
try:
for entry in sorted(dir_path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())):
if entry.name.startswith('.'):
continue
if entry.is_dir():
rel_path = str(entry.relative_to(self.project_root))
directories.append({
'name': entry.name,
'path': rel_path
})
elif entry.is_file():
suffix = entry.suffix.lower()
if suffix in self.SUPPORTED_EXTENSIONS:
rel_path = str(entry.relative_to(self.project_root))
files.append({
'name': entry.name,
'path': rel_path,
'type': suffix[1:]
})
parent_path = None
if relative_path:
parent = Path(relative_path).parent
parent_path = str(parent) if parent != Path('.') else ""
return {
'success': True,
'current_path': relative_path,
'parent_path': parent_path,
'directories': directories,
'files': files
}
except PermissionError:
return {
'success': False,
'error': f'Permission denied: {relative_path}'
}
except Exception as e:
return {
'success': False,
'error': str(e)
}