#!/usr/bin/env python3 """Flask web application for ORA editing.""" import io import os import sys import time import zipfile from datetime import datetime from pathlib import Path from xml.etree import ElementTree as ET try: from flask import ( Flask, request, jsonify, send_file, Response, render_template, make_response ) except ImportError: print("Error: Flask is required. Install with: pip install -r requirements.txt") sys.exit(1) from PIL import Image, ImageDraw # Configure paths APP_DIR = Path(__file__).parent PROJECT_ROOT = Path("/home/noti/dev/ai-game-2") TEMP_DIR = APP_DIR / "temp" TEMP_DIR.mkdir(exist_ok=True) # Import ora operations from sibling directory sys.path.insert(0, str(APP_DIR.parent)) from ora_editor.ora_ops import ( load_ora, create_ora_from_png, add_masked_layer, rename_layer, delete_layer, reorder_layer, get_layer_visibility, set_layer_visibility, save_ora, parse_stack_xml ) # In-memory storage for polygon points (per-request basis) _polygon_storage = {} app = Flask(__name__, template_folder='templates') def ensure_ora_exists(input_path: str) -> str | None: """Ensure an ORA file exists, creating from PNG if necessary.""" full_path = PROJECT_ROOT / input_path if not full_path.exists(): return None # If it's a PNG, create ORA if full_path.suffix.lower() == '.png': ora_path = str(full_path.with_suffix('.ora')) result = create_ora_from_png(str(full_path), ora_path) if not result.get('success'): return None return ora_path # It's an ORA, verify it's valid try: parse_stack_xml(str(full_path)) return str(full_path) except Exception: return None @app.route('/') def index(): """Serve the main editor UI.""" return render_template('editor.html') @app.route('/api/open', methods=['POST']) def api_open(): """Open a file (PNG or ORA).""" data = request.get_json() if not data or 'path' not in data: return jsonify({'success': False, 'error': 'Missing path parameter'}), 400 input_path = data['path'] ora_path = ensure_ora_exists(input_path) if not ora_path: return jsonify({'success': False, 'error': f'File not found or invalid: {input_path}'}), 404 try: loaded = load_ora(ora_path) # Build layers list with visibility layers = [] for layer in loaded['layers']: layers.append({ 'name': layer['name'], 'src': layer['src'], 'group': layer.get('group'), 'visible': layer.get('visible', True) }) return jsonify({ 'success': True, 'ora_path': ora_path, 'width': loaded['width'], 'height': loaded['height'], 'layers': layers }) except Exception as e: return jsonify({'success': False, 'error': f'Error loading ORA: {str(e)}'}), 500 @app.route('/api/save', methods=['POST']) def api_save(): """Save the current ORA state.""" data = request.get_json() if not data or 'ora_path' not in data: return jsonify({'success': False, 'error': 'Missing ora_path parameter'}), 400 ora_path = data['ora_path'] # Save by re-reading and re-writing (preserves current state) result = save_ora(ora_path) if result: return jsonify({'success': True, 'ora_path': ora_path}) else: return jsonify({'success': False, 'error': 'Failed to save ORA'}), 500 @app.route('/api/layers', methods=['GET']) def api_layers(): """Get the current layer structure.""" ora_path = request.args.get('ora_path') if not ora_path: return jsonify({'success': False, 'error': 'Missing ora_path parameter'}), 400 try: loaded = load_ora(ora_path) layers = [] for layer in loaded['layers']: layers.append({ 'name': layer['name'], 'src': layer['src'], 'group': layer.get('group'), 'visible': layer.get('visible', True) }) return jsonify({'success': True, 'layers': layers}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/layer/add', methods=['POST']) def api_layer_add(): """Add a new masked layer.""" data = request.get_json() required = ['ora_path', 'entity_name', 'mask_path'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 ora_path = data['ora_path'] entity_name = data['entity_name'] mask_path = data['mask_path'] source_layer = data.get('source_layer', 'base') result = add_masked_layer(ora_path, entity_name, mask_path, source_layer) return jsonify(result) @app.route('/api/layer/rename', methods=['POST']) def api_layer_rename(): """Rename a layer.""" data = request.get_json() required = ['ora_path', 'old_name', 'new_name'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 result = rename_layer(data['ora_path'], data['old_name'], data['new_name']) return jsonify(result) @app.route('/api/layer/delete', methods=['POST']) def api_layer_delete(): """Delete a layer.""" data = request.get_json() required = ['ora_path', 'layer_name'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 result = delete_layer(data['ora_path'], data['layer_name']) return jsonify(result) @app.route('/api/layer/reorder', methods=['POST']) def api_layer_reorder(): """Reorder a layer.""" data = request.get_json() required = ['ora_path', 'layer_name', 'direction'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 result = reorder_layer(data['ora_path'], data['layer_name'], data['direction']) return jsonify(result) @app.route('/api/layer/visibility', methods=['POST']) def api_layer_visibility(): """Set layer visibility.""" data = request.get_json() required = ['ora_path', 'layer_name', 'visible'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 result = set_layer_visibility(data['ora_path'], data['layer_name'], data['visible']) return jsonify(result) @app.route('/api/image/') def api_image(layer_name): """Serve a specific layer image.""" ora_path = request.args.get('ora_path') if not ora_path: return jsonify({'error': 'Missing ora_path'}), 400 try: root = parse_stack_xml(ora_path) with zipfile.ZipFile(ora_path, 'r') as zf: # Get the image from mergedimage.png img_data = zf.read('mergedimage.png') return send_file(io.BytesIO(img_data), mimetype='image/png') except Exception as e: return Response(f"Error loading image: {e}", status=500) @app.route('/api/image/base') def api_image_base(): """Serve the base/merged image.""" ora_path = request.args.get('ora_path') if not ora_path: return jsonify({'error': 'Missing ora_path'}), 400 try: with zipfile.ZipFile(ora_path, 'r') as zf: img_data = zf.read('mergedimage.png') return send_file(io.BytesIO(img_data), mimetype='image/png') except Exception as e: return Response(f"Error loading image: {e}", status=500) @app.route('/api/image/polygon') def api_image_polygon(): """Serve polygon overlay if stored.""" ora_path = request.args.get('ora_path') if not ora_path or ora_path not in _polygon_storage: return Response("No polygon data", status=404) try: with zipfile.ZipFile(ora_path, 'r') as zf: base_img = Image.open(zf.open('mergedimage.png')).convert('RGBA') points = _polygon_storage[ora_path].get('points', []) color = _polygon_storage[ora_path].get('color', '#FF0000') width = _polygon_storage[ora_path].get('width', 2) if len(points) < 3: return Response("Not enough points", status=404) # Convert percentage points to pixel coordinates w, h = base_img.size pixel_points = [(int(p['x'] * w), int(p['y'] * h)) for p in points] draw = ImageDraw.Draw(base_img) # Draw as hex color with alpha hex_color = color if len(color) == 7 else color + 'FF' draw.polygon(pixel_points, outline=hex_color, width=width) img_io = io.BytesIO() base_img.save(img_io, format='PNG') img_io.seek(0) return send_file(img_io, mimetype='image/png') except Exception as e: return Response(f"Error drawing polygon: {e}", status=500) @app.route('/api/polygon', methods=['POST']) def api_polygon(): """Store polygon points for overlay.""" data = request.get_json() required = ['ora_path', 'points'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 _polygon_storage[data['ora_path']] = { 'points': data['points'], 'color': data.get('color', '#FF0000'), 'width': data.get('width', 2) } return jsonify({ 'success': True, 'overlay_url': f'/api/image/polygon?ora_path={data["ora_path"]}' }) @app.route('/api/polygon/clear', methods=['POST']) def api_polygon_clear(): """Clear stored polygon points.""" ora_path = request.json.get('ora_path') if request.data else None if ora_path and ora_path in _polygon_storage: del _polygon_storage[ora_path] return jsonify({'success': True}) @app.route('/api/mask/extract', methods=['POST']) def api_mask_extract(): """Extract mask using ComfyUI.""" data = request.get_json() required = ['subject'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 subject = data['subject'] use_polygon = data.get('use_polygon', False) ora_path = data.get('ora_path') comfy_url = data.get('comfy_url', '127.0.0.1:8188') # Build the ComfyUI prompt import json import urllib.request import base64 import uuid as uuid_lib # Load workflow template workflow_path = APP_DIR.parent / "image_mask_extraction.json" if not workflow_path.exists(): return jsonify({'success': False, 'error': f'Workflow file not found: {workflow_path}'}), 500 with open(workflow_path) as f: workflow = json.load(f) # Update prompt text in workflow for node_id, node in workflow.items(): if 'inputs' in node and 'text' in node['inputs']: node['inputs']['text'] = f"Create a black and white alpha mask of {subject}, leaving everything else black" break # Queue the prompt prompt_id = str(uuid_lib.uuid4()) headers = {'Content-Type': 'application/json'} try: req_data = json.dumps({'prompt': workflow, 'client_id': prompt_id}).encode() req = urllib.request.Request( f'http://{comfy_url}/prompt', data=req_data, headers=headers, method='POST' ) with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read().decode()) except Exception as e: return jsonify({'success': False, 'error': f'Failed to connect to ComfyUI: {str(e)}'}), 500 # Poll for completion (up to 4 minutes) start_time = time.time() timeout = 240 # 4 minutes while time.time() - start_time < timeout: try: req = urllib.request.Request( f'http://{comfy_url}/history/{prompt_id}', headers=headers, method='GET' ) with urllib.request.urlopen(req, timeout=30) as response: history = json.loads(response.read().decode()) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id, node_output in outputs.items(): images = node_output.get('images', []) for img_info in images: filename = img_info['filename'] subfolder = img_info.get('subfolder', '') # Download the image download_req = urllib.request.Request( f'http://{comfy_url}/view?filename={filename}&subfolder={subfolder}&type=output', headers=headers, method='GET' ) with urllib.request.urlopen(download_req, timeout=30) as img_response: img_data = img_response.read() # Save to temp directory timestamp = str(int(time.time())) mask_path = TEMP_DIR / f"mask_{timestamp}.png" with open(mask_path, 'wb') as f: f.write(img_data) return jsonify({ 'success': True, 'mask_path': str(mask_path), 'mask_url': f'/api/file/mask?path={mask_path}' }) except urllib.error.HTTPError as e: if e.code != 404: break time.sleep(2) return jsonify({'success': False, 'error': 'Mask extraction timed out'}), 500 @app.route('/api/krita/open', methods=['POST']) def api_krita_open(): """Copy a layer to temp and return file URL for Krita.""" data = request.get_json() required = ['ora_path', 'layer_name'] for field in required: if field not in data: return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400 ora_path = data['ora_path'] layer_name = data['layer_name'] try: # Extract the layer image from ORA root = parse_stack_xml(ora_path) groups = load_ora(ora_path)['groups'] found = None for group in groups: for layer in group['layers']: if layer['name'] == layer_name: found = (group, layer) break if found: break if not found: return jsonify({'success': False, 'error': f'Layer {layer_name} not found'}), 404 _, layer_info = found with zipfile.ZipFile(ora_path, 'r') as zf: img = Image.open(zf.open(layer_info['src'])).convert('RGBA') # Save to temp directory timestamp = str(int(time.time())) temp_file = TEMP_DIR / f"{layer_name}_{timestamp}.png" img.save(temp_file) return jsonify({ 'success': True, 'file_url': f'file://{temp_file}', 'temp_path': str(temp_file), 'base_mtime': os.path.getmtime(temp_file) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/krita/status/') def api_krita_status(layer_name): """Check if a Krita temp file has been modified.""" # Find the temp file for this layer temp_files = list(TEMP_DIR.glob(f"{layer_name}_*.png")) if not temp_files: return jsonify({'success': False, 'error': 'No temp file found'}), 404 # Get most recent file temp_file = max(temp_files, key=lambda f: f.stat().st_mtime) mtime = temp_file.stat().st_mtime stored_mtime = request.args.get('stored_mtime') if stored_mtime: modified = float(stored_mtime) != mtime else: modified = True return jsonify({ 'success': True, 'modified': modified, 'mtime': mtime, 'file_url': f'file://{temp_file}' }) @app.route('/api/file/mask') def api_file_mask(): """Serve a mask file from temp directory.""" path = request.args.get('path') if not path: return jsonify({'error': 'Missing path'}), 400 full_path = Path(path) if not full_path.exists(): return jsonify({'error': 'File not found'}), 404 return send_file(full_path, mimetype='image/png') @app.route('/api/image/layer/') def api_image_layer(layer_name): """Serve a specific layer as image.""" ora_path = request.args.get('ora_path') if not ora_path: return jsonify({'error': 'Missing ora_path'}), 400 try: # Find the layer source path root = parse_stack_xml(ora_path) with zipfile.ZipFile(ora_path, 'r') as zf: stack = ET.fromstring(zf.read('stack.xml')) for child in stack.find('stack'): if child.tag == 'layer' and child.get('name') == layer_name: src = child.get('src') img_data = zf.read(src) return send_file(io.BytesIO(img_data), mimetype='image/png') elif child.tag == 'stack': # Group for layer in child: if layer.tag == 'layer' and layer.get('name') == layer_name: src = layer.get('src') img_data = zf.read(src) return send_file(io.BytesIO(img_data), mimetype='image/png') return jsonify({'error': 'Layer not found'}), 404 except Exception as e: import traceback traceback.print_exc() return Response(f"Error loading layer: {e}", status=500) if __name__ == '__main__': app.run(debug=True, port=5000, host='127.0.0.1')