Files
ai-game-2/tools/ora_editor/app.py
Bryce c79b0b4add Add Flask application with API endpoints
- Routes for file operations, layer management, polygon drawing
- Mask extraction endpoint with ComfyUI integration
- Krita integration endpoints
- Basic API tests
2026-03-27 08:49:27 -07:00

575 lines
18 KiB
Python

#!/usr/bin/env python3
"""Flask web application for ORA editing."""
import io
import os
import sys
import time
import zipfile
from datetime import datetime
from pathlib import Path
from xml.etree import ElementTree as ET
try:
from flask import (
Flask, request, jsonify, send_file, Response, render_template,
make_response
)
except ImportError:
print("Error: Flask is required. Install with: pip install -r requirements.txt")
sys.exit(1)
from PIL import Image, ImageDraw
# Configure paths
APP_DIR = Path(__file__).parent
PROJECT_ROOT = Path("/home/noti/dev/ai-game-2")
TEMP_DIR = APP_DIR / "temp"
TEMP_DIR.mkdir(exist_ok=True)
# Import ora operations from sibling directory
sys.path.insert(0, str(APP_DIR.parent))
from ora_editor.ora_ops import (
load_ora, create_ora_from_png, add_masked_layer, rename_layer,
delete_layer, reorder_layer, get_layer_visibility, set_layer_visibility,
save_ora, parse_stack_xml
)
# In-memory storage for polygon points (per-request basis)
_polygon_storage = {}
app = Flask(__name__, template_folder='templates')
def ensure_ora_exists(input_path: str) -> str | None:
"""Ensure an ORA file exists, creating from PNG if necessary."""
full_path = PROJECT_ROOT / input_path
if not full_path.exists():
return None
# If it's a PNG, create ORA
if full_path.suffix.lower() == '.png':
ora_path = str(full_path.with_suffix('.ora'))
result = create_ora_from_png(str(full_path), ora_path)
if not result.get('success'):
return None
return ora_path
# It's an ORA, verify it's valid
try:
parse_stack_xml(str(full_path))
return str(full_path)
except Exception:
return None
@app.route('/')
def index():
"""Serve the main editor UI."""
return render_template('editor.html')
@app.route('/api/open', methods=['POST'])
def api_open():
"""Open a file (PNG or ORA)."""
data = request.get_json()
if not data or 'path' not in data:
return jsonify({'success': False, 'error': 'Missing path parameter'}), 400
input_path = data['path']
ora_path = ensure_ora_exists(input_path)
if not ora_path:
return jsonify({'success': False, 'error': f'File not found or invalid: {input_path}'}), 404
try:
loaded = load_ora(ora_path)
# Build layers list with visibility
layers = []
for layer in loaded['layers']:
layers.append({
'name': layer['name'],
'src': layer['src'],
'group': layer.get('group'),
'visible': layer.get('visible', True)
})
return jsonify({
'success': True,
'ora_path': ora_path,
'width': loaded['width'],
'height': loaded['height'],
'layers': layers
})
except Exception as e:
return jsonify({'success': False, 'error': f'Error loading ORA: {str(e)}'}), 500
@app.route('/api/save', methods=['POST'])
def api_save():
"""Save the current ORA state."""
data = request.get_json()
if not data or 'ora_path' not in data:
return jsonify({'success': False, 'error': 'Missing ora_path parameter'}), 400
ora_path = data['ora_path']
# Save by re-reading and re-writing (preserves current state)
result = save_ora(ora_path)
if result:
return jsonify({'success': True, 'ora_path': ora_path})
else:
return jsonify({'success': False, 'error': 'Failed to save ORA'}), 500
@app.route('/api/layers', methods=['GET'])
def api_layers():
"""Get the current layer structure."""
ora_path = request.args.get('ora_path')
if not ora_path:
return jsonify({'success': False, 'error': 'Missing ora_path parameter'}), 400
try:
loaded = load_ora(ora_path)
layers = []
for layer in loaded['layers']:
layers.append({
'name': layer['name'],
'src': layer['src'],
'group': layer.get('group'),
'visible': layer.get('visible', True)
})
return jsonify({'success': True, 'layers': layers})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/layer/add', methods=['POST'])
def api_layer_add():
"""Add a new masked layer."""
data = request.get_json()
required = ['ora_path', 'entity_name', 'mask_path']
for field in required:
if field not in data:
return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400
ora_path = data['ora_path']
entity_name = data['entity_name']
mask_path = data['mask_path']
source_layer = data.get('source_layer', 'base')
result = add_masked_layer(ora_path, entity_name, mask_path, source_layer)
return jsonify(result)
@app.route('/api/layer/rename', methods=['POST'])
def api_layer_rename():
"""Rename a layer."""
data = request.get_json()
required = ['ora_path', 'old_name', 'new_name']
for field in required:
if field not in data:
return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400
result = rename_layer(data['ora_path'], data['old_name'], data['new_name'])
return jsonify(result)
@app.route('/api/layer/delete', methods=['POST'])
def api_layer_delete():
"""Delete a layer."""
data = request.get_json()
required = ['ora_path', 'layer_name']
for field in required:
if field not in data:
return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400
result = delete_layer(data['ora_path'], data['layer_name'])
return jsonify(result)
@app.route('/api/layer/reorder', methods=['POST'])
def api_layer_reorder():
"""Reorder a layer."""
data = request.get_json()
required = ['ora_path', 'layer_name', 'direction']
for field in required:
if field not in data:
return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400
result = reorder_layer(data['ora_path'], data['layer_name'], data['direction'])
return jsonify(result)
@app.route('/api/layer/visibility', methods=['POST'])
def api_layer_visibility():
"""Set layer visibility."""
data = request.get_json()
required = ['ora_path', 'layer_name', 'visible']
for field in required:
if field not in data:
return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400
result = set_layer_visibility(data['ora_path'], data['layer_name'], data['visible'])
return jsonify(result)
@app.route('/api/image/<layer_name>')
def api_image(layer_name):
"""Serve a specific layer image."""
ora_path = request.args.get('ora_path')
if not ora_path:
return jsonify({'error': 'Missing ora_path'}), 400
try:
root = parse_stack_xml(ora_path)
with zipfile.ZipFile(ora_path, 'r') as zf:
# Get the image from mergedimage.png
img_data = zf.read('mergedimage.png')
return send_file(io.BytesIO(img_data), mimetype='image/png')
except Exception as e:
return Response(f"Error loading image: {e}", status=500)
@app.route('/api/image/base')
def api_image_base():
"""Serve the base/merged image."""
ora_path = request.args.get('ora_path')
if not ora_path:
return jsonify({'error': 'Missing ora_path'}), 400
try:
with zipfile.ZipFile(ora_path, 'r') as zf:
img_data = zf.read('mergedimage.png')
return send_file(io.BytesIO(img_data), mimetype='image/png')
except Exception as e:
return Response(f"Error loading image: {e}", status=500)
@app.route('/api/image/polygon')
def api_image_polygon():
"""Serve polygon overlay if stored."""
ora_path = request.args.get('ora_path')
if not ora_path or ora_path not in _polygon_storage:
return Response("No polygon data", status=404)
try:
with zipfile.ZipFile(ora_path, 'r') as zf:
base_img = Image.open(zf.open('mergedimage.png')).convert('RGBA')
points = _polygon_storage[ora_path].get('points', [])
color = _polygon_storage[ora_path].get('color', '#FF0000')
width = _polygon_storage[ora_path].get('width', 2)
if len(points) < 3:
return Response("Not enough points", status=404)
# Convert percentage points to pixel coordinates
w, h = base_img.size
pixel_points = [(int(p['x'] * w), int(p['y'] * h)) for p in points]
draw = ImageDraw.Draw(base_img)
# Draw as hex color with alpha
hex_color = color if len(color) == 7 else color + 'FF'
draw.polygon(pixel_points, outline=hex_color, width=width)
img_io = io.BytesIO()
base_img.save(img_io, format='PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
except Exception as e:
return Response(f"Error drawing polygon: {e}", status=500)
@app.route('/api/polygon', methods=['POST'])
def api_polygon():
"""Store polygon points for overlay."""
data = request.get_json()
required = ['ora_path', 'points']
for field in required:
if field not in data:
return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400
_polygon_storage[data['ora_path']] = {
'points': data['points'],
'color': data.get('color', '#FF0000'),
'width': data.get('width', 2)
}
return jsonify({
'success': True,
'overlay_url': f'/api/image/polygon?ora_path={data["ora_path"]}'
})
@app.route('/api/polygon/clear', methods=['POST'])
def api_polygon_clear():
"""Clear stored polygon points."""
ora_path = request.json.get('ora_path') if request.data else None
if ora_path and ora_path in _polygon_storage:
del _polygon_storage[ora_path]
return jsonify({'success': True})
@app.route('/api/mask/extract', methods=['POST'])
def api_mask_extract():
"""Extract mask using ComfyUI."""
data = request.get_json()
required = ['subject']
for field in required:
if field not in data:
return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400
subject = data['subject']
use_polygon = data.get('use_polygon', False)
ora_path = data.get('ora_path')
comfy_url = data.get('comfy_url', '127.0.0.1:8188')
# Build the ComfyUI prompt
import json
import urllib.request
import base64
import uuid as uuid_lib
# Load workflow template
workflow_path = APP_DIR.parent / "image_mask_extraction.json"
if not workflow_path.exists():
return jsonify({'success': False, 'error': f'Workflow file not found: {workflow_path}'}), 500
with open(workflow_path) as f:
workflow = json.load(f)
# Update prompt text in workflow
for node_id, node in workflow.items():
if 'inputs' in node and 'text' in node['inputs']:
node['inputs']['text'] = f"Create a black and white alpha mask of {subject}, leaving everything else black"
break
# Queue the prompt
prompt_id = str(uuid_lib.uuid4())
headers = {'Content-Type': 'application/json'}
try:
req_data = json.dumps({'prompt': workflow, 'client_id': prompt_id}).encode()
req = urllib.request.Request(
f'http://{comfy_url}/prompt',
data=req_data,
headers=headers,
method='POST'
)
with urllib.request.urlopen(req, timeout=30) as response:
result = json.loads(response.read().decode())
except Exception as e:
return jsonify({'success': False, 'error': f'Failed to connect to ComfyUI: {str(e)}'}), 500
# Poll for completion (up to 4 minutes)
start_time = time.time()
timeout = 240 # 4 minutes
while time.time() - start_time < timeout:
try:
req = urllib.request.Request(
f'http://{comfy_url}/history/{prompt_id}',
headers=headers,
method='GET'
)
with urllib.request.urlopen(req, timeout=30) as response:
history = json.loads(response.read().decode())
if prompt_id in history:
outputs = history[prompt_id]['outputs']
for node_id, node_output in outputs.items():
images = node_output.get('images', [])
for img_info in images:
filename = img_info['filename']
subfolder = img_info.get('subfolder', '')
# Download the image
download_req = urllib.request.Request(
f'http://{comfy_url}/view?filename={filename}&subfolder={subfolder}&type=output',
headers=headers,
method='GET'
)
with urllib.request.urlopen(download_req, timeout=30) as img_response:
img_data = img_response.read()
# Save to temp directory
timestamp = str(int(time.time()))
mask_path = TEMP_DIR / f"mask_{timestamp}.png"
with open(mask_path, 'wb') as f:
f.write(img_data)
return jsonify({
'success': True,
'mask_path': str(mask_path),
'mask_url': f'/api/file/mask?path={mask_path}'
})
except urllib.error.HTTPError as e:
if e.code != 404:
break
time.sleep(2)
return jsonify({'success': False, 'error': 'Mask extraction timed out'}), 500
@app.route('/api/krita/open', methods=['POST'])
def api_krita_open():
"""Copy a layer to temp and return file URL for Krita."""
data = request.get_json()
required = ['ora_path', 'layer_name']
for field in required:
if field not in data:
return jsonify({'success': False, 'error': f'Missing {field} parameter'}), 400
ora_path = data['ora_path']
layer_name = data['layer_name']
try:
# Extract the layer image from ORA
root = parse_stack_xml(ora_path)
groups = load_ora(ora_path)['groups']
found = None
for group in groups:
for layer in group['layers']:
if layer['name'] == layer_name:
found = (group, layer)
break
if found:
break
if not found:
return jsonify({'success': False, 'error': f'Layer {layer_name} not found'}), 404
_, layer_info = found
with zipfile.ZipFile(ora_path, 'r') as zf:
img = Image.open(zf.open(layer_info['src'])).convert('RGBA')
# Save to temp directory
timestamp = str(int(time.time()))
temp_file = TEMP_DIR / f"{layer_name}_{timestamp}.png"
img.save(temp_file)
return jsonify({
'success': True,
'file_url': f'file://{temp_file}',
'temp_path': str(temp_file),
'base_mtime': os.path.getmtime(temp_file)
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/krita/status/<layer_name>')
def api_krita_status(layer_name):
"""Check if a Krita temp file has been modified."""
# Find the temp file for this layer
temp_files = list(TEMP_DIR.glob(f"{layer_name}_*.png"))
if not temp_files:
return jsonify({'success': False, 'error': 'No temp file found'}), 404
# Get most recent file
temp_file = max(temp_files, key=lambda f: f.stat().st_mtime)
mtime = temp_file.stat().st_mtime
stored_mtime = request.args.get('stored_mtime')
if stored_mtime:
modified = float(stored_mtime) != mtime
else:
modified = True
return jsonify({
'success': True,
'modified': modified,
'mtime': mtime,
'file_url': f'file://{temp_file}'
})
@app.route('/api/file/mask')
def api_file_mask():
"""Serve a mask file from temp directory."""
path = request.args.get('path')
if not path:
return jsonify({'error': 'Missing path'}), 400
full_path = Path(path)
if not full_path.exists():
return jsonify({'error': 'File not found'}), 404
return send_file(full_path, mimetype='image/png')
@app.route('/api/image/layer/<layer_name>')
def api_image_layer(layer_name):
"""Serve a specific layer as image."""
ora_path = request.args.get('ora_path')
if not ora_path:
return jsonify({'error': 'Missing ora_path'}), 400
try:
# Find the layer source path
root = parse_stack_xml(ora_path)
with zipfile.ZipFile(ora_path, 'r') as zf:
stack = ET.fromstring(zf.read('stack.xml'))
for child in stack.find('stack'):
if child.tag == 'layer' and child.get('name') == layer_name:
src = child.get('src')
img_data = zf.read(src)
return send_file(io.BytesIO(img_data), mimetype='image/png')
elif child.tag == 'stack': # Group
for layer in child:
if layer.tag == 'layer' and layer.get('name') == layer_name:
src = layer.get('src')
img_data = zf.read(src)
return send_file(io.BytesIO(img_data), mimetype='image/png')
return jsonify({'error': 'Layer not found'}), 404
except Exception as e:
import traceback
traceback.print_exc()
return Response(f"Error loading layer: {e}", status=500)
if __name__ == '__main__':
app.run(debug=True, port=5000, host='127.0.0.1')