Files
email-organizer/app/routes.py
2025-08-06 15:38:49 -07:00

716 lines
31 KiB
Python

from flask import Blueprint, render_template, request, jsonify, make_response, flash, redirect, url_for
from flask_login import login_required, current_user
from app import db
from app.models import Folder, User, ProcessedEmail
from app.imap_service import IMAPService
from app.processed_emails_service import ProcessedEmailsService
import uuid
import logging
main = Blueprint('main', __name__)
@main.route('/')
@login_required
def index():
# Get folders for the current authenticated user
folders = Folder.query.filter_by(user_id=current_user.id).all()
# Separate folders by type
tidy_folders = [folder for folder in folders if folder.folder_type == 'tidy']
destination_folders = [folder for folder in folders if folder.folder_type == 'destination']
return render_template('index.html',
folders=folders,
tidy_folders=tidy_folders,
destination_folders=destination_folders)
@main.route('/api/folders/new', methods=['GET'])
@login_required
def new_folder_modal():
# Return the add folder modal
response = make_response(render_template('partials/folder_modal.html'))
response.headers['HX-Trigger'] = 'open-modal'
return response
@main.route('/api/folders', methods=['POST'])
@login_required
def add_folder():
try:
# Get form data instead of JSON
name = request.form.get('name')
rule_text = request.form.get('rule_text')
priority = request.form.get('priority')
# Server-side validation
errors = {}
if not name or not name.strip():
errors['name'] = 'Folder name is required'
elif len(name.strip()) < 3:
errors['name'] = 'Folder name must be at least 3 characters'
elif len(name.strip()) > 50:
errors['name'] = 'Folder name must be less than 50 characters'
if not rule_text or not rule_text.strip():
errors['rule_text'] = 'Rule text is required'
elif len(rule_text.strip()) < 10:
errors['rule_text'] = 'Rule text must be at least 10 characters'
elif len(rule_text.strip()) > 200:
errors['rule_text'] = 'Rule text must be less than 200 characters'
# If there are validation errors, return the modal with errors
if errors:
response = make_response(render_template('partials/folder_modal.html', errors=errors, name=name, rule_text=rule_text, priority=priority))
response.headers['HX-Retarget'] = '#folder-modal'
response.headers['HX-Reswap'] = 'outerHTML'
return response
# Create new folder for the current user
# Default to 'destination' type for manually created folders
folder_type = 'tidy' if name.strip().lower() == 'inbox' else 'destination'
folder = Folder(
user_id=current_user.id,
name=name.strip(),
rule_text=rule_text.strip(),
priority=int(priority) if priority else 0,
folder_type=folder_type
)
db.session.add(folder)
db.session.commit()
# Get updated list of folders for the current user
folders = Folder.query.filter_by(user_id=current_user.id).all()
# Get updated lists of folders by type
tidy_folders = [folder for folder in folders if folder.folder_type == 'tidy']
destination_folders = [folder for folder in folders if folder.folder_type == 'destination']
# Return both sections
tidy_section = render_template('partials/folders_to_tidy_section.html', tidy_folders=tidy_folders)
destination_section = render_template('partials/destination_folders_section.html', destination_folders=destination_folders)
response = make_response(tidy_section + destination_section)
response.headers['HX-Trigger'] = 'close-modal'
response.status_code = 201
return response
except Exception as e:
# Print unhandled exceptions to the console as required
logging.exception("Error adding folder: %s", e)
db.session.rollback()
# Return error in modal
errors = {'general': 'An unexpected error occurred. Please try again.'}
# Get updated lists of folders by type for error fallback
folders = Folder.query.filter_by(user_id=current_user.id).all()
tidy_folders = [folder for folder in folders if folder.folder_type == 'tidy']
destination_folders = [folder for folder in folders if folder.folder_type == 'destination']
# Return both sections as fallback
tidy_section = render_template('partials/folders_to_tidy_section.html', tidy_folders=tidy_folders)
destination_section = render_template('partials/destination_folders_section.html', destination_folders=destination_folders)
response = make_response(render_template('partials/folder_modal.html', errors=errors, name=name, rule_text=rule_text, priority=priority))
response.headers['HX-Retarget'] = '#folder-modal'
response.headers['HX-Reswap'] = 'outerHTML'
return response
@main.route('/api/folders/<folder_id>', methods=['DELETE'])
@login_required
def delete_folder(folder_id):
try:
# Find the folder by ID and ensure it belongs to the current user
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
if not folder:
# Folder not found
folders = Folder.query.filter_by(user_id=current_user.id).all()
return render_template('partials/folders_list.html', folders=folders)
# Delete all associated processed emails first
ProcessedEmail.query.filter_by(folder_id=folder.id).delete()
# Delete the folder
db.session.delete(folder)
db.session.commit()
# Get updated list of folders for the current user
folders = Folder.query.filter_by(user_id=current_user.id).all()
# Get updated lists of folders by type
tidy_folders = [folder for folder in folders if folder.folder_type == 'tidy']
destination_folders = [folder for folder in folders if folder.folder_type == 'destination']
# Return both sections
tidy_section = render_template('partials/folders_to_tidy_section.html', tidy_folders=tidy_folders)
destination_section = render_template('partials/destination_folders_section.html', destination_folders=destination_folders)
return tidy_section + destination_section
except Exception as e:
# Print unhandled exceptions to the console as required
logging.exception("Error deleting folder: %s", e)
db.session.rollback()
# Return the folders list unchanged
folders = Folder.query.filter_by(user_id=current_user.id).all()
tidy_folders = [folder for folder in folders if folder.folder_type == 'tidy']
destination_folders = [folder for folder in folders if folder.folder_type == 'destination']
# Return both sections
tidy_section = render_template('partials/folders_to_tidy_section.html', tidy_folders=tidy_folders)
destination_section = render_template('partials/destination_folders_section.html', destination_folders=destination_folders)
return tidy_section + destination_section
@main.route('/api/folders/<folder_id>/toggle', methods=['PUT'])
@login_required
def toggle_folder_organize(folder_id):
try:
# Find the folder by ID and ensure it belongs to the current user
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
if not folder:
return jsonify({'error': 'Folder not found'}), 404
# Toggle the organize_enabled flag
folder.organize_enabled = not folder.organize_enabled
db.session.commit()
# Return just the updated folder card HTML for this specific folder
# Use the appropriate template based on folder type
if folder.folder_type == 'tidy':
return render_template('partials/folder_card_tidy.html', folder=folder)
else:
return render_template('partials/folder_card_destination.html', folder=folder)
except Exception as e:
# Print unhandled exceptions to the console as required
logging.exception("Error toggling folder organize flag: %s", e)
db.session.rollback()
return jsonify({'error': 'An unexpected error occurred'}), 500
@main.route('/api/folders/<folder_id>/edit', methods=['GET'])
@login_required
def edit_folder_modal(folder_id):
try:
# Find the folder by ID and ensure it belongs to the current user
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
if not folder:
return jsonify({'error': 'Folder not found'}), 404
# Return the edit folder modal with folder data
response = make_response(render_template('partials/folder_modal.html', folder=folder))
response.headers['HX-Trigger'] = 'open-modal'
return response
except Exception as e:
# Print unhandled exceptions to the console as required
logging.exception("Error getting folder for edit: %s", e)
return jsonify({'error': 'Error retrieving folder'}), 500
@main.route('/api/folders/<folder_id>', methods=['PUT'])
@login_required
def update_folder(folder_id):
try:
# Find the folder by ID and ensure it belongs to the current user
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
if not folder:
# Folder not found
folders = Folder.query.filter_by(user_id=current_user.id).all()
return render_template('partials/folders_list.html', folders=folders)
# Get form data
name = request.form.get('name')
rule_text = request.form.get('rule_text')
priority = request.form.get('priority')
# Server-side validation
errors = {}
if not name or not name.strip():
errors['name'] = 'Folder name is required'
elif len(name.strip()) < 3:
errors['name'] = 'Folder name must be at least 3 characters'
elif len(name.strip()) > 50:
errors['name'] = 'Folder name must be less than 50 characters'
if not rule_text or not rule_text.strip():
errors['rule_text'] = 'Rule text is required'
elif len(rule_text.strip()) < 10:
errors['rule_text'] = 'Rule text must be at least 10 characters'
elif len(rule_text.strip()) > 200:
errors['rule_text'] = 'Rule text must be less than 200 characters'
# If there are validation errors, return the modal with errors
if errors:
# Get updated lists of folders by type for error fallback
folders = Folder.query.filter_by(user_id=current_user.id).all()
tidy_folders = [folder for folder in folders if folder.folder_type == 'tidy']
destination_folders = [folder for folder in folders if folder.folder_type == 'destination']
# Return both sections as fallback
tidy_section = render_template('partials/folders_to_tidy_section.html', tidy_folders=tidy_folders)
destination_section = render_template('partials/destination_folders_section.html', destination_folders=destination_folders)
response = make_response(render_template('partials/folder_modal.html', folder=folder, errors=errors, name=name, rule_text=rule_text, priority=priority))
response.headers['HX-Retarget'] = '#folder-modal'
response.headers['HX-Reswap'] = 'outerHTML'
return response
# Update folder
folder.name = name.strip()
folder.rule_text = rule_text.strip()
folder.priority = int(priority) if priority else 0
# Update folder type if the name changed to/from 'inbox'
if name.strip().lower() == 'inbox' and folder.folder_type != 'tidy':
folder.folder_type = 'tidy'
elif name.strip().lower() != 'inbox' and folder.folder_type != 'destination':
folder.folder_type = 'destination'
db.session.commit()
# Get updated list of folders for the current user
folders = Folder.query.filter_by(user_id=current_user.id).all()
# Get updated lists of folders by type
tidy_folders = [folder for folder in folders if folder.folder_type == 'tidy']
destination_folders = [folder for folder in folders if folder.folder_type == 'destination']
# Return both sections
tidy_section = render_template('partials/folders_to_tidy_section.html', tidy_folders=tidy_folders)
destination_section = render_template('partials/destination_folders_section.html', destination_folders=destination_folders)
response = make_response(tidy_section + destination_section)
response.headers['HX-Trigger'] = 'close-modal'
return response
except Exception as e:
# Print unhandled exceptions to the console as required
logging.exception("Error updating folder: %s", e)
db.session.rollback()
# Return error in modal
errors = {'general': 'An unexpected error occurred. Please try again.'}
response = make_response(render_template('partials/folder_modal.html', folder=folder, errors=errors, name=name, rule_text=rule_text, priority=priority))
response.headers['HX-Retarget'] = '#folder-modal'
response.headers['HX-Reswap'] = 'outerHTML'
return response
@main.route('/api/imap/config', methods=['GET'])
@login_required
def imap_config_modal():
"""Return the IMAP configuration modal."""
# Pass existing IMAP config to the template if it exists
response = make_response(render_template('partials/imap_config_modal.html',
server=current_user.imap_config.get('server') if current_user.imap_config else None,
port=current_user.imap_config.get('port') if current_user.imap_config else None,
username=current_user.imap_config.get('username') if current_user.imap_config else None,
password=current_user.imap_config.get('password') if current_user.imap_config else None,
use_ssl=current_user.imap_config.get('use_ssl', True) if current_user.imap_config else True))
response.headers['HX-Trigger'] = 'open-modal'
return response
@main.route('/api/imap/test', methods=['POST'])
@login_required
def test_imap_connection():
"""Test IMAP connection with provided configuration."""
try:
# Get form data
server = request.form.get('server')
port = request.form.get('port')
username = request.form.get('username')
password = request.form.get('password')
use_ssl = request.form.get('use_ssl') == 'on'
# Validate required fields
errors = {}
if not server:
errors['server'] = 'Server is required'
if not port:
errors['port'] = 'Port is required'
elif not port.isdigit():
errors['port'] = 'Port must be a number'
if not username:
errors['username'] = 'Username is required'
if not password:
errors['password'] = 'Password is required'
if errors:
response = make_response(render_template('partials/imap_config_modal.html', errors=errors, server=server, port=port, username=username, use_ssl=use_ssl))
response.headers['HX-Retarget'] = '#imap-modal'
response.headers['HX-Reswap'] = 'outerHTML'
return response
# Store configuration temporarily for testing
test_config = {
'server': server,
'port': int(port),
'username': username,
'password': password,
'use_ssl': False,
'use_tls': False,
'connection_timeout': 30
}
# Test connection
temp_user = type('User', (), {'imap_config': test_config})()
imap_service = IMAPService(temp_user)
print(temp_user, test_config)
success, message = imap_service.test_connection()
if success:
# Save configuration to user's profile
current_user.imap_config = test_config
db.session.commit()
response = make_response(render_template('partials/imap_config_modal.html',
success=True, message=message))
response.headers['HX-Retarget'] = '#imap-modal'
response.headers['HX-Reswap'] = 'outerHTML'
else:
print(message)
response = make_response(render_template('partials/imap_config_modal.html',
errors={'general': message}, server=server, port=port, username=username, use_ssl=use_ssl))
response.headers['HX-Retarget'] = '#imap-modal'
response.headers['HX-Reswap'] = 'outerHTML'
return response
except Exception as e:
logging.exception("Error testing IMAP connection: %s", e)
print(e)
errors = {'general': 'An unexpected error occurred. Please try again.'}
response = make_response(render_template('partials/imap_config_modal.html', errors=errors, server=server, port=port, username=username, use_ssl=use_ssl))
response.headers['HX-Retarget'] = '#imap-modal'
response.headers['HX-Reswap'] = 'outerHTML'
return response
@main.route('/api/imap/sync', methods=['POST'])
@login_required
def sync_imap_folders():
"""Sync folders from IMAP server with processed email tracking."""
try:
if not current_user.imap_config:
return jsonify({'error': 'No IMAP configuration found. Please configure IMAP first.'}), 400
# Test connection first
imap_service = IMAPService(current_user)
# Get folders from IMAP server
imap_folders = imap_service.get_folders()
if not imap_folders:
return jsonify({'error': 'No folders found on IMAP server'}), 400
# Deduplicate folders by name to prevent creating multiple entries for the same folder
unique_folders = []
seen_names = set()
for imap_folder in imap_folders:
folder_name = imap_folder['name']
# Skip special folders that might not be needed
if folder_name.lower() in ['sent', 'drafts', 'spam', 'trash']:
continue
# Use case-insensitive comparison for deduplication
folder_name_lower = folder_name.lower()
if folder_name_lower not in seen_names:
unique_folders.append(imap_folder)
seen_names.add(folder_name_lower)
# Process each unique folder
synced_count = 0
print("HELLOOO")
processed_emails_service = ProcessedEmailsService(current_user)
for imap_folder in unique_folders:
folder_name = imap_folder['name'].strip()
# Handle nested folder names (convert slashes to underscores or keep as-is)
# According to requirements, nested folders should be created with slashes in the name
display_name = folder_name
# Check if folder already exists
existing_folder = Folder.query.filter_by(
user_id=current_user.id,
name=display_name
).first()
if not existing_folder:
# Create new folder
# Determine folder type - inbox should be 'tidy', others 'destination'
folder_type = 'tidy' if folder_name.lower().strip() == 'inbox' else 'destination'
new_folder = Folder(
user_id=current_user.id,
name=display_name,
rule_text=f"Auto-synced from IMAP folder: {folder_name}",
priority=0, # Default priority
folder_type=folder_type
)
db.session.add(new_folder)
synced_count += 1
else:
# Update existing folder with email counts and recent emails
# Get all email UIDs in this folder
email_uids = imap_service.get_folder_email_uids(folder_name)
# Sync with processed emails service
new_emails_count = processed_emails_service.sync_folder_emails(display_name, email_uids)
print("NEW", new_emails_count)
# Update counts
pending_count = processed_emails_service.get_pending_count(display_name)
existing_folder.pending_count = pending_count
existing_folder.total_count = len(email_uids)
# Get the most recent emails for this folder
recent_emails = imap_service.get_recent_emails(folder_name, 3)
existing_folder.recent_emails = recent_emails
db.session.commit()
# Get updated list of folders
folders = Folder.query.filter_by(user_id=current_user.id).all()
tidy_folders = [folder for folder in folders if folder.folder_type == 'tidy']
destination_folders = [folder for folder in folders if folder.folder_type == 'destination']
# Return both sections
tidy_section = render_template('partials/folders_to_tidy_section.html', tidy_folders=tidy_folders)
destination_section = render_template('partials/destination_folders_section.html', destination_folders=destination_folders)
return tidy_section + destination_section
except Exception as e:
logging.exception("Error syncing IMAP folders: %s", e)
print(e)
db.session.rollback()
return jsonify({'error': 'An unexpected error occurred'}), 500
@main.route('/api/folders', methods=['GET'])
@login_required
def get_folders():
"""Get folders with optional filtering."""
# Get filter parameter from query string
filter_type = request.args.get('filter', 'all')
folder_type = request.args.get('type', None)
# Get folders for the current authenticated user
if folder_type:
# Filter by folder type
folders = Folder.query.filter_by(user_id=current_user.id, folder_type=folder_type).all()
elif filter_type == 'high':
# Get high priority folders (priority = 1)
folders = Folder.query.filter_by(user_id=current_user.id, priority=1).all()
elif filter_type == 'normal':
# Get normal priority folders (priority = 0 or not set)
folders = Folder.query.filter_by(user_id=current_user.id).filter(Folder.priority != 1).all()
else:
# Get all folders
folders = Folder.query.filter_by(user_id=current_user.id).all()
# Check if we need to return a specific section
if folder_type == 'tidy':
return render_template('partials/folders_to_tidy_section.html', tidy_folders=folders)
elif folder_type == 'destination':
return render_template('partials/destination_folders_section.html', destination_folders=folders)
else:
return render_template('partials/folders_list.html', folders=folders)
# Processed Emails API Endpoints
@main.route('/api/folders/<int:folder_id>/pending-emails', methods=['GET'])
@login_required
def get_pending_emails(folder_id):
"""Get pending emails for a folder with email metadata."""
try:
# Find the folder and ensure it belongs to the current user
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
if not folder:
return jsonify({'error': 'Folder not found'}), 404
# Get IMAP service to fetch email data
imap_service = IMAPService(current_user)
# Get all email UIDs in the folder
all_uids = imap_service.get_folder_email_uids(folder.name)
# Get processed emails service
processed_emails_service = ProcessedEmailsService(current_user)
# Get pending email UIDs
pending_uids = processed_emails_service.get_pending_emails(folder.name)
# Get email headers for pending emails
pending_emails = []
for uid in pending_uids:
headers = imap_service.get_email_headers(folder.name, uid)
if headers:
# Add UID to the response
email_data = {
'uid': uid,
'subject': headers.get('subject', 'No Subject'),
'date': headers.get('date', ''),
'from': headers.get('from', ''),
'to': headers.get('to', ''),
'message_id': headers.get('message_id', '')
}
pending_emails.append(email_data)
# Return the pending emails in a dialog format
response = make_response(render_template('partials/pending_emails_dialog.html',
folder=folder,
pending_emails=pending_emails))
response.headers['HX-Trigger'] = 'open-modal'
return response
except Exception as e:
logging.exception("Error getting pending emails: %s", e)
return jsonify({'error': 'An unexpected error occurred'}), 500
@main.route('/api/folders/<int:folder_id>/emails/<email_uid>/process', methods=['POST'])
@login_required
def mark_email_processed(folder_id, email_uid):
"""Mark a specific email as processed."""
try:
# Find the folder and ensure it belongs to the current user
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
if not folder:
return jsonify({'error': 'Folder not found'}), 404
# Get processed emails service
processed_emails_service = ProcessedEmailsService(current_user)
# Mark email as processed
success = processed_emails_service.mark_email_processed(folder.name, email_uid)
if success:
# Get updated counts
pending_count = processed_emails_service.get_pending_count(folder.name)
# Update the folder's count based on folder type
if folder.folder_type == 'tidy':
folder.pending_count = pending_count
elif folder.folder_type == 'destination':
# For destination folders, update emails_count
folder.emails_count = pending_count
db.session.commit()
# Return updated dialog body
response = make_response(render_template('partials/pending_emails_updated.html',
folder=folder,
uid=email_uid))
response.headers['HX-Trigger'] = 'close-modal'
return response
else:
return jsonify({'error': 'Failed to mark email as processed'}), 500
except Exception as e:
logging.exception("Error marking email as processed: %s", e)
return jsonify({'error': 'An unexpected error occurred'}), 500
@main.route('/api/folders/<int:folder_id>/sync-emails', methods=['POST'])
@login_required
def sync_folder_emails(folder_id):
"""Sync emails for a specific folder with processed email tracking."""
try:
# Find the folder and ensure it belongs to the current user
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
if not folder:
return jsonify({'error': 'Folder not found'}), 404
# Get IMAP service to fetch email UIDs
imap_service = IMAPService(current_user)
# Get all email UIDs in the folder
email_uids = imap_service.get_folder_email_uids(folder.name)
if not email_uids:
return jsonify({'error': 'No emails found in folder'}), 404
# Get processed emails service
processed_emails_service = ProcessedEmailsService(current_user)
# Sync emails with the processed emails service
new_emails_count = processed_emails_service.sync_folder_emails(folder.name, email_uids)
# Get updated counts
pending_count = processed_emails_service.get_pending_count(folder.name)
# Update the folder's counts based on folder type
if folder.folder_type == 'tidy':
folder.pending_count = pending_count
folder.total_count = len(email_uids)
elif folder.folder_type == 'destination':
# For destination folders, update emails_count
folder.emails_count = pending_count # Using pending_count as emails_count for destination folders
db.session.commit()
# Return success response with updated counts
return jsonify({
'success': True,
'message': f'Synced {new_emails_count} new emails',
'pending_count': pending_count,
'total_count': len(email_uids)
})
except Exception as e:
logging.exception("Error syncing folder emails: %s", e)
return jsonify({'error': 'An unexpected error occurred'}), 500
@main.route('/api/folders/<int:folder_id>/process-emails', methods=['POST'])
@login_required
def process_folder_emails(folder_id):
"""Process multiple emails in a folder (mark as processed)."""
try:
# Find the folder and ensure it belongs to the current user
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
if not folder:
return jsonify({'error': 'Folder not found'}), 404
# Get email UIDs from form data
email_uids = request.form.getlist('email_uids')
if not email_uids:
return jsonify({'error': 'No email UIDs provided'}), 400
# Get processed emails service
processed_emails_service = ProcessedEmailsService(current_user)
# Mark emails as processed
processed_count = processed_emails_service.mark_emails_processed(folder.name, email_uids)
if processed_count > 0:
# Get updated counts
pending_count = processed_emails_service.get_pending_count(folder.name)
# Update the folder's count based on folder type
if folder.folder_type == 'tidy':
folder.pending_count = pending_count
elif folder.folder_type == 'destination':
# For destination folders, update emails_count
folder.emails_count = pending_count
db.session.commit()
return jsonify({
'success': True,
'message': f'Processed {processed_count} emails',
'pending_count': pending_count
})
else:
return jsonify({'error': 'No emails were processed'}), 400
except Exception as e:
logging.exception("Error processing folder emails: %s", e)
return jsonify({'error': 'An unexpected error occurred'}), 500