#!/usr/bin/env python3 """Flask web application for ORA editing.""" import io import os import sys import time import zipfile from datetime import datetime from pathlib import Path from xml.etree import ElementTree as ET try: from flask import ( Flask, request, jsonify, send_file, Response, render_template, make_response ) from PIL import Image, ImageDraw import logging import urllib.parse # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Configure paths APP_DIR = Path(__file__).parent PROJECT_ROOT = Path("/home/noti/dev/ai-game-2") TEMP_DIR = APP_DIR / "temp" TEMP_DIR.mkdir(exist_ok=True) # Import ora operations from sibling directory sys.path.insert(0, str(APP_DIR.parent)) from ora_editor.ora_ops import ( load_ora, create_ora_from_png, add_masked_layer, rename_layer, delete_layer, reorder_layer, get_layer_visibility, set_layer_visibility, save_ora, parse_stack_xml ) # In-memory storage for polygon points (per-request basis) _polygon_storage = {} app = Flask(__name__, template_folder='templates') def ensure_ora_exists(input_path: str) -> str | None: """Ensure an ORA file exists, creating from PNG if necessary.""" full_path = PROJECT_ROOT / input_path if not full_path.exists(): return None # If it's a PNG, create ORA if full_path.suffix.lower() == '.png': ora_path = str(full_path.with_suffix('.ora')) result = create_ora_from_png(str(full_path), ora_path) if not result.get('success'): return None return ora_path # It's an ORA, verify it's valid try: parse_stack_xml(str(full_path)) return str(full_path) except Exception: return None @app.route('/') def index(): """Serve the main editor UI.""" return render_template('editor.html') @app.route('/api/open', methods=['POST']) def api_open(): """Open a file (PNG or ORA).""" data = request.get_json() if not data or 'path' not in data: return jsonify({'success': False, 'error': 'Missing path parameter'}), 400 input_path = data['path'] ora_path = ensure_ora_exists(input_path) if not ora_path: return jsonify({'success': False, 'error': f'File not found or invalid: {input_path}'}), 404 try: loaded = load_ora(ora_path) # Build layers list with visibility layers = [] for layer in loaded['layers']: layers.append({ 'name': layer['name'], 'src': layer['src'], 'group': layer.get('group'), 'visible': layer.get('visible', True) }) return jsonify({ 'success': True, 'ora_path': ora_path, 'width': loaded['width'], 'height': loaded['height'], 'layers': layers }) except Exception as e: return jsonify({'success': False, 'error': f'Error loading ORA: {str(e)}'}), 500 @app.route('/api/save', methods=['POST']) def api_save(): """Save the current ORA state.""" data = request.get_json() if not data or 'ora_path' not in data: return jsonify({'success': False, 'error': 'Missing ora_path parameter'}), 400 ora_path = data['ora_path'] # Save by re-reading and re-writing (preserves current state) result = save_ora(ora_path) if result: return jsonify({'success': True, 'ora_path': ora_path}) else: return jsonify({'success': False, 'error': 'Failed to save ORA'}), 500 @app.route('/api/layers', methods=['GET']) def api_layers(): """Get the current layer structure.""" ora_path = request.args.get('ora_path') if not ora_path: return jsonify({'success': False, 'error': 'Missing ora_path parameter'}), 400 try: loaded = load_ora(ora_path) layers = [] for layer in loaded['layers']: layers.append({ 'name': layer['name'], 'src': layer['src'], 'group': layer.get('group'), 'visible': layer.get('visible', True) }) return jsonify({'success': True, 'layers': layers}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/layer/add', methods=['POST']) def api_layer_add(): """Add a new masked layer.""" data = request.get_json() required = ['ora_path', 'entity_name', 'mask_path'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 ora_path = data['ora_path'] entity_name = data['entity_name'] mask_path = data['mask_path'] source_layer = data.get('source_layer', 'base') result = add_masked_layer(ora_path, entity_name, mask_path, source_layer) return jsonify(result) @app.route('/api/layer/rename', methods=['POST']) def api_layer_rename(): """Rename a layer.""" data = request.get_json() required = ['ora_path', 'old_name', 'new_name'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 result = rename_layer(data['ora_path'], data['old_name'], data['new_name']) return jsonify(result) @app.route('/api/layer/delete', methods=['POST']) def api_layer_delete(): """Delete a layer.""" data = request.get_json() required = ['ora_path', 'layer_name'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 result = delete_layer(data['ora_path'], data['layer_name']) return jsonify(result) @app.route('/api/layer/reorder', methods=['POST']) def api_layer_reorder(): """Reorder a layer.""" data = request.get_json() required = ['ora_path', 'layer_name', 'direction'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 result = reorder_layer(data['ora_path'], data['layer_name'], data['direction']) return jsonify(result) @app.route('/api/layer/visibility', methods=['POST']) def api_layer_visibility(): """Set layer visibility.""" data = request.get_json() required = ['ora_path', 'layer_name', 'visible'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 result = set_layer_visibility(data['ora_path'], data['layer_name'], data['visible']) return jsonify(result) @app.route('/api/image/') def api_image(layer_name): """Serve a specific layer image.""" ora_path = request.args.get('ora_path') if not ora_path: return jsonify({'error': 'Missing ora_path'}), 400 try: root = parse_stack_xml(ora_path) with zipfile.ZipFile(ora_path, 'r') as zf: # Get the image from mergedimage.png img_data = zf.read('mergedimage.png') return send_file(io.BytesIO(img_data), mimetype='image/png') except Exception as e: return Response(f"Error loading image: {e}", status=500) @app.route('/api/image/base') def api_image_base(): """Serve the base/merged image.""" ora_path = request.args.get('ora_path') if not ora_path: return jsonify({'error': 'Missing ora_path'}), 400 try: with zipfile.ZipFile(ora_path, 'r') as zf: img_data = zf.read('mergedimage.png') return send_file(io.BytesIO(img_data), mimetype='image/png') except Exception as e: return Response(f"Error loading image: {e}", status=500) @app.route('/api/image/polygon') def api_image_polygon(): """Serve polygon overlay if stored.""" ora_path = request.args.get('ora_path') if not ora_path or ora_path not in _polygon_storage: return Response("No polygon data", status=404) try: with zipfile.ZipFile(ora_path, 'r') as zf: base_img = Image.open(zf.open('mergedimage.png')).convert('RGBA') points = _polygon_storage[ora_path].get('points', []) color = _polygon_storage[ora_path].get('color', '#FF0000') width = _polygon_storage[ora_path].get('width', 2) if len(points) < 3: return Response("Not enough points", status=404) # Convert percentage points to pixel coordinates w, h = base_img.size pixel_points = [(int(p['x'] * w), int(p['y'] * h)) for p in points] draw = ImageDraw.Draw(base_img) # Draw as hex color with alpha hex_color = color if len(color) == 7 else color + 'FF' draw.polygon(pixel_points, outline=hex_color, width=width) img_io = io.BytesIO() base_img.save(img_io, format='PNG') img_io.seek(0) return send_file(img_io, mimetype='image/png') except Exception as e: return Response(f"Error drawing polygon: {e}", status=500) @app.route('/api/polygon', methods=['POST']) def api_polygon(): """Store polygon points for overlay.""" data = request.get_json() required = ['ora_path', 'points'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 ora_path = data['ora_path'] points = data['points'] color = data.get('color', '#FF0000') width = data.get('width', 2) logger.info(f"[POLYGON] Storing polygon: {len(points)} points, color: {color}, width: {width}") _polygon_storage[ora_path] = { 'points': points, 'color': color, 'width': width } return jsonify({ 'success': True, 'overlay_url': f'/api/image/polygon?ora_path={ora_path}' }) @app.route('/api/polygon/clear', methods=['POST']) def api_polygon_clear(): """Clear stored polygon points.""" ora_path = request.json.get('ora_path') if request.data else None if ora_path and ora_path in _polygon_storage: del _polygon_storage[ora_path] return jsonify({'success': True}) @app.route('/api/mask/extract', methods=['POST']) def api_mask_extract(): """Extract mask using ComfyUI.""" data = request.get_json() required = ['subject', 'ora_path'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 subject = data['subject'] use_polygon = data.get('use_polygon', False) ora_path = data['ora_path'] comfy_url = data.get('comfy_url', '127.0.0.1:8188') logger.info(f"[MASK EXTRACT] Subject: {subject}") logger.info(f"[MASK EXTRACT] Use polygon: {use_polygon}") logger.info(f"[MASK EXTRACT] ORA path: {ora_path}") logger.info(f"[MASK EXTRACT] ComfyUI URL: {comfy_url}") # Check if we need to apply polygon to image if use_polygon: polygon_data = _polygon_storage.get(ora_path, {}) polygon_points = polygon_data.get('points', []) if not polygon_points or len(polygon_points) < 3: logger.warning(f"[MASK EXTRACT] Use polygon requested but no valid polygon points stored") return jsonify({'success': False, 'error': 'No valid polygon points. Please draw polygon first.'}), 400 logger.info(f"[MASK EXTRACT] Applying polygon overlay: {len(polygon_points)} points") # Load workflow template workflow_path = APP_DIR.parent / "image_mask_extraction.json" if not workflow_path.exists(): logger.error(f"[MASK EXTRACT] Workflow file not found: {workflow_path}") return jsonify({'success': False, 'error': f'Workflow file not found: {workflow_path}'}), 500 with open(workflow_path) as f: workflow = json.load(f) logger.info(f"[MASK EXTRACT] Loaded workflow") # Update prompt text in workflow # Load base image from ORA base_img = None try: with zipfile.ZipFile(ora_path, 'r') as zf: img_data = zf.read('mergedimage.png') base_img = Image.open(io.BytesIO(img_data)).convert('RGBA') logger.info(f"[MASK EXTRACT] Loaded base image: {base_img.size}") except Exception as e: logger.error(f"[MASK EXTRACT] Error loading base image: {e}") return jsonify({'success': False, 'error': f'Error loading image: {str(e)}'}), 500 # Apply polygon overlay if requested if use_polygon: polygon_data = _polygon_storage.get(ora_path, {}) polygon_points = polygon_data.get('points', []) if not polygon_points or len(polygon_points) < 3: logger.warning(f"[MASK EXTRACT] Use polygon requested but no valid polygon points stored") return jsonify({'success': False, 'error': 'No valid polygon points. Please draw polygon first.'}), 400 logger.info(f"[MASK EXTRACT] Applying polygon overlay: {len(polygon_points)} points") # Convert percentage points to pixel coordinates w, h = base_img.size pixel_points = [(int(p['x'] * w), int(p['y'] * h)) for p in polygon_points] # Draw polygon on image polygon_color = polygon_data.get('color', '#FF0000') polygon_width = polygon_data.get('width', 2) draw = ImageDraw.Draw(base_img) # Draw the polygon hex_color = polygon_color if len(polygon_color) == 7 else polygon_color + 'FF' draw.polygon(pixel_points, outline=hex_color, width=polygon_width) logger.info(f"[MASK EXTRACT] Drew polygon on image") # Encode (possibly modified) image img_io = io.BytesIO() base_img.save(img_io, format='PNG') img_io.seek(0) base64_image = base64.b64encode(img_io.read()).decode('utf-8') logger.info(f"[MASK EXTRACT] Encoded image as base64: {len(base64_image)} chars") # Set image input in workflow workflow["168"]["inputs"]["image"] = base64_image # Update prompt text for node_id, node in workflow.items(): if 'inputs' in node and 'text' in node['inputs']: node['inputs']['text'] = f"Create a black and white alpha mask of {subject}, leaving everything else black" break # Poll for completion (up to 4 minutes) start_time = time.time() timeout = 240 while time.time() - start_time < timeout: try: req = urllib.request.Request( f'http://{comfy_url}/history/{prompt_id}', headers=headers, method='GET' ) with urllib.request.urlopen(req, timeout=30) as response: history = json.loads(response.read().decode()) if prompt_id in history: outputs = history[prompt_id].get('outputs', {}) logger.info(f"[MASK EXTRACT] Status check - prompt_id found in history") for node_id, node_output in outputs.items(): if 'images' in node_output: images = node_output['images'] logger.info(f"[MASK EXTRACT] Found {len(images)} images in node {node_id}") for img_info in images: filename = img_info.get('filename', '') subfolder = img_info.get('subfolder', '') logger.info(f"[MASK EXTRACT] Downloading image: {filename} from subfolder: {subfolder}") # Download the image params = { 'filename': filename, 'subfolder': subfolder, 'type': 'output' } download_req = urllib.request.Request( f'http://{comfy_url}/view?{urllib.parse.urlencode(params)}', headers=headers, method='GET' ) with urllib.request.urlopen(download_req, timeout=30) as img_response: img_data = img_response.read() # Save to temp directory timestamp = str(int(time.time())) mask_path = TEMP_DIR / f"mask_{timestamp}.png" with open(mask_path, 'wb') as f: f.write(img_data) logger.info(f"[MASK EXTRACT] Saved mask to: {mask_path} ({len(img_data)} bytes)") return jsonify({ 'success': True, 'mask_path': str(mask_path), 'mask_url': f'/api/file/mask?path={mask_path}' }) else: logger.info(f"[MASK EXTRACT] Prompt_id not in history yet, waiting...") except urllib.error.HTTPError as e: if e.code != 404: logger.error(f"[MASK EXTRACT] HTTP Error polling: {e.code}") break time.sleep(2) # Timeout logger.error(f"[MASK EXTRACT] Timeout waiting for ComfyUI to complete") return jsonify({'success': False, 'error': 'Mask extraction timed out'}), 500 except Exception as e: logger.exception(f"[MASK EXTRACT] Error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/krita/open', methods=['POST']) def api_krita_open(): """Copy a layer to temp and return file URL for Krita.""" data = request.get_json() required = ['ora_path', 'layer_name'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 ora_path = data['ora_path'] layer_name = data['layer_name'] try: # Extract the layer image from ORA root = parse_stack_xml(ora_path) groups = load_ora(ora_path)['groups'] found = None for group in groups: for layer in group['layers']: if layer['name'] == layer_name: found = (group, layer) break if found: break if not found: return jsonify({'success': False, 'error': f'Layer {layer_name} not found'}), 404 _, layer_info = found with zipfile.ZipFile(ora_path, 'r') as zf: img = Image.open(zf.open(layer_info['src'])).convert('RGBA') # Save to temp directory timestamp = str(int(time.time())) temp_file = TEMP_DIR / f"{layer_name}_{timestamp}.png" img.save(temp_file) return jsonify({ 'success': True, 'file_url': f'file://{temp_file}', 'temp_path': str(temp_file), 'base_mtime': os.path.getmtime(temp_file) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/krita/status/') def api_krita_status(layer_name): """Check if a Krita temp file has been modified.""" # Find the temp file for this layer temp_files = list(TEMP_DIR.glob(f"{layer_name}_*.png")) if not temp_files: return jsonify({'success': False, 'error': 'No temp file found'}), 404 # Get most recent file temp_file = max(temp_files, key=lambda f: f.stat().st_mtime) mtime = temp_file.stat().st_mtime stored_mtime = request.args.get('stored_mtime') if stored_mtime: modified = float(stored_mtime) != mtime else: modified = True return jsonify({ 'success': True, 'modified': modified, 'mtime': mtime, 'file_url': f'file://{temp_file}' }) @app.route('/api/file/mask') def api_file_mask(): """Serve a mask file from temp directory.""" path = request.args.get('path') if not path: return jsonify({'error': 'Missing path'}), 400 full_path = Path(path) if not full_path.exists(): return jsonify({'error': 'File not found'}), 404 return send_file(full_path, mimetype='image/png') @app.route('/api/image/layer/') def api_image_layer(layer_name): """Serve a specific layer as image.""" ora_path = request.args.get('ora_path') if not ora_path: return jsonify({'error': 'Missing ora_path'}), 400 try: # Find the layer source path root = parse_stack_xml(ora_path) with zipfile.ZipFile(ora_path, 'r') as zf: stack = ET.fromstring(zf.read('stack.xml')) for child in stack.find('stack'): if child.tag == 'layer' and child.get('name') == layer_name: src = child.get('src') img_data = zf.read(src) return send_file(io.BytesIO(img_data), mimetype='image/png') elif child.tag == 'stack': # Group for layer in child: if layer.tag == 'layer' and layer.get('name') == layer_name: src = layer.get('src') img_data = zf.read(src) return send_file(io.BytesIO(img_data), mimetype='image/png') return jsonify({'error': 'Layer not found'}), 404 except Exception as e: import traceback traceback.print_exc() return Response(f"Error loading layer: {e}", status=500) if __name__ == '__main__': app.run(debug=True, port=5000, host='127.0.0.1')