merged
This commit is contained in:
@@ -25,6 +25,13 @@ login_manager.login_message = 'Please log in to access this page.'
|
||||
login_manager.login_message_category = 'warning'
|
||||
|
||||
|
||||
# Import scheduler (import here to avoid circular imports)
|
||||
try:
|
||||
from app.scheduler import Scheduler
|
||||
except ImportError:
|
||||
Scheduler = None
|
||||
|
||||
|
||||
def create_app(config_name='default'):
|
||||
app = Flask(__name__, static_folder='static', static_url_path='/static')
|
||||
app.config.from_object(config[config_name])
|
||||
@@ -43,6 +50,11 @@ def create_app(config_name='default'):
|
||||
migrate = Migrate(app, db)
|
||||
login_manager.init_app(app)
|
||||
|
||||
# Initialize and register scheduler if available
|
||||
if Scheduler:
|
||||
scheduler = Scheduler(app)
|
||||
app.scheduler = scheduler
|
||||
|
||||
# Register blueprints
|
||||
from app.routes import main
|
||||
from app.auth import auth
|
||||
|
||||
@@ -32,3 +32,33 @@ def setup_dev():
|
||||
print("Docker Compose not found. Please install Docker and Docker Compose.")
|
||||
sys.exit(1)
|
||||
|
||||
@app.cli.command("start-scheduler")
|
||||
@click.option('--interval', default=5, help='Interval in minutes between processing runs (default: 5)')
|
||||
@with_appcontext
|
||||
def start_scheduler(interval):
|
||||
"""Start the background email processing scheduler."""
|
||||
if not hasattr(app, 'scheduler'):
|
||||
print("Scheduler not available. Make sure app/scheduler.py exists and is properly imported.")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Starting email processing scheduler with {interval} minute interval...")
|
||||
print("Press Ctrl+C to stop")
|
||||
|
||||
try:
|
||||
# Start the scheduler
|
||||
app.scheduler.start()
|
||||
|
||||
# Keep the main thread alive
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutting down scheduler...")
|
||||
app.scheduler.stop()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error starting scheduler: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Import at the top of the file (add this import if not already present)
|
||||
import time
|
||||
317
app/email_processor.py
Normal file
317
app/email_processor.py
Normal file
@@ -0,0 +1,317 @@
|
||||
from typing import List, Dict, Optional
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
from app.models import db, Folder, User, ProcessedEmail
|
||||
from app.imap_service import IMAPService
|
||||
from app.processed_emails_service import ProcessedEmailsService
|
||||
|
||||
|
||||
class EmailProcessor:
|
||||
"""
|
||||
Service class for processing emails in the background according to user-defined rules.
|
||||
Handles automated organization of emails based on folder configurations.
|
||||
"""
|
||||
|
||||
def __init__(self, user: User):
|
||||
self.user = user
|
||||
self.imap_service = IMAPService(user)
|
||||
self.processed_emails_service = ProcessedEmailsService(user)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def process_user_emails(self) -> Dict[str, any]:
|
||||
"""
|
||||
Process emails for a user according to their folder rules.
|
||||
|
||||
Returns:
|
||||
Dictionary with processing results including success count, error count, and timing
|
||||
"""
|
||||
result = {
|
||||
'success_count': 0,
|
||||
'error_count': 0,
|
||||
'processed_folders': [],
|
||||
'start_time': datetime.utcnow(),
|
||||
'end_time': None,
|
||||
'duration': None
|
||||
}
|
||||
|
||||
try:
|
||||
# Get all folders with organize_enabled = True, ordered by priority (highest first)
|
||||
folders = Folder.query.filter_by(
|
||||
user_id=self.user.id,
|
||||
organize_enabled=True
|
||||
).order_by(Folder.priority.desc()).all()
|
||||
|
||||
if not folders:
|
||||
self.logger.info(f"No folders to process for user {self.user.email}")
|
||||
result['end_time'] = datetime.utcnow()
|
||||
result['duration'] = (result['end_time'] - result['start_time']).total_seconds()
|
||||
return result
|
||||
|
||||
self.logger.info(f"Processing {len(folders)} folders for user {self.user.email}")
|
||||
|
||||
# Process each folder according to priority
|
||||
for folder in folders:
|
||||
try:
|
||||
folder_result = self.process_folder_emails(folder)
|
||||
result['success_count'] += folder_result['processed_count']
|
||||
result['error_count'] += folder_result['error_count']
|
||||
result['processed_folders'].append({
|
||||
'folder_id': folder.id,
|
||||
'folder_name': folder.name,
|
||||
'processed_count': folder_result['processed_count'],
|
||||
'error_count': folder_result['error_count']
|
||||
})
|
||||
|
||||
self.logger.info(f"Processed {folder_result['processed_count']} emails for folder {folder.name}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error processing folder {folder.name}: {str(e)}")
|
||||
result['error_count'] += 1
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in process_user_emails for user {self.user.email}: {str(e)}")
|
||||
result['error_count'] += 1
|
||||
|
||||
finally:
|
||||
result['end_time'] = datetime.utcnow()
|
||||
result['duration'] = (result['end_time'] - result['start_time']).total_seconds()
|
||||
|
||||
return result
|
||||
|
||||
def process_folder_emails(self, folder: Folder) -> Dict[str, any]:
|
||||
"""
|
||||
Process emails for a specific folder according to its rules.
|
||||
|
||||
Args:
|
||||
folder: The folder to process
|
||||
|
||||
Returns:
|
||||
Dictionary with processing results for this folder
|
||||
"""
|
||||
result = {
|
||||
'processed_count': 0,
|
||||
'error_count': 0,
|
||||
'folder_id': folder.id,
|
||||
'folder_name': folder.name
|
||||
}
|
||||
|
||||
try:
|
||||
# Get pending emails for this folder
|
||||
pending_email_uids = self.processed_emails_service.get_pending_emails(folder.name)
|
||||
|
||||
if not pending_email_uids:
|
||||
self.logger.info(f"No pending emails to process for folder {folder.name}")
|
||||
return result
|
||||
|
||||
self.logger.info(f"Processing {len(pending_email_uids)} pending emails for folder {folder.name}")
|
||||
|
||||
# Process emails in batches to manage system resources
|
||||
batch_size = 10 # Configurable batch size
|
||||
processed_batch_count = 0
|
||||
|
||||
for i in range(0, len(pending_email_uids), batch_size):
|
||||
batch = pending_email_uids[i:i + batch_size]
|
||||
batch_result = self._process_email_batch(folder, batch)
|
||||
|
||||
result['processed_count'] += batch_result['processed_count']
|
||||
result['error_count'] += batch_result['error_count']
|
||||
processed_batch_count += 1
|
||||
|
||||
self.logger.info(f"Processed batch {i//batch_size + 1} for folder {folder.name}: {batch_result['processed_count']} success, {batch_result['error_count']} errors")
|
||||
|
||||
# Update folder pending count after processing
|
||||
self._update_folder_counts(folder)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in process_folder_emails for folder {folder.name}: {str(e)}")
|
||||
result['error_count'] += 1
|
||||
|
||||
return result
|
||||
|
||||
def _process_email_batch(self, folder: Folder, email_uids: List[str]) -> Dict[str, any]:
|
||||
"""
|
||||
Process a batch of emails for a folder.
|
||||
|
||||
Args:
|
||||
folder: The folder whose rules to apply
|
||||
email_uids: List of email UIDs to process
|
||||
|
||||
Returns:
|
||||
Dictionary with processing results for this batch
|
||||
"""
|
||||
result = {
|
||||
'processed_count': 0,
|
||||
'error_count': 0
|
||||
}
|
||||
|
||||
try:
|
||||
# Connect to IMAP server
|
||||
self.imap_service._connect()
|
||||
|
||||
# Login
|
||||
username = self.user.imap_config.get('username', '')
|
||||
password = self.user.imap_config.get('password', '')
|
||||
self.imap_service.connection.login(username, password)
|
||||
|
||||
# Select the source folder
|
||||
resp_code, content = self.imap_service.connection.select(folder.name)
|
||||
if resp_code != 'OK':
|
||||
raise Exception(f"Failed to select folder {folder.name}: {content}")
|
||||
|
||||
# Process each email in the batch
|
||||
processed_uids = []
|
||||
for email_uid in email_uids:
|
||||
try:
|
||||
# Get email headers to evaluate rules
|
||||
headers = self.imap_service.get_email_headers(folder.name, email_uid)
|
||||
|
||||
if not headers:
|
||||
self.logger.warning(f"Could not get headers for email {email_uid} in folder {folder.name}")
|
||||
result['error_count'] += 1
|
||||
continue
|
||||
|
||||
# Evaluate rules to determine destination
|
||||
destination_folder = self._evaluate_rules(headers, folder.rule_text)
|
||||
|
||||
if destination_folder and destination_folder != folder.name:
|
||||
# Move email to destination folder
|
||||
if self._move_email(email_uid, folder.name, destination_folder):
|
||||
processed_uids.append(email_uid)
|
||||
result['processed_count'] += 1
|
||||
else:
|
||||
self.logger.error(f"Failed to move email {email_uid} from {folder.name} to {destination_folder}")
|
||||
result['error_count'] += 1
|
||||
else:
|
||||
# Mark as processed (no move needed)
|
||||
processed_uids.append(email_uid)
|
||||
result['processed_count'] += 1
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error processing email {email_uid}: {str(e)}")
|
||||
result['error_count'] += 1
|
||||
continue
|
||||
|
||||
# Mark processed emails in the database
|
||||
if processed_uids:
|
||||
count = self.processed_emails_service.mark_emails_processed(folder.name, processed_uids)
|
||||
if count != len(processed_uids):
|
||||
self.logger.warning(f"Marked {count} emails as processed, but expected {len(processed_uids)}")
|
||||
|
||||
# Close folder and logout
|
||||
self.imap_service.connection.close()
|
||||
self.imap_service.connection.logout()
|
||||
self.imap_service.connection = None
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in _process_email_batch: {str(e)}")
|
||||
result['error_count'] += 1
|
||||
|
||||
# Clean up connection if needed
|
||||
if self.imap_service.connection:
|
||||
try:
|
||||
self.imap_service.connection.close()
|
||||
self.imap_service.connection.logout()
|
||||
except:
|
||||
pass
|
||||
self.imap_service.connection = None
|
||||
|
||||
return result
|
||||
|
||||
def _evaluate_rules(self, headers: Dict[str, str], rule_text: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
Evaluate rules against email headers to determine destination folder.
|
||||
|
||||
Args:
|
||||
headers: Email headers (subject, from, to, date)
|
||||
rule_text: Rule text defining processing criteria
|
||||
|
||||
Returns:
|
||||
Destination folder name or None if no move is needed
|
||||
"""
|
||||
# This is a simplified implementation - in a real app, this would parse
|
||||
# the rule_text and evaluate it against the headers
|
||||
# For now, we'll implement a simple rule evaluation
|
||||
|
||||
if not rule_text:
|
||||
return None
|
||||
|
||||
# Simple example: if rule contains 'move to', extract destination
|
||||
# This should be replaced with proper rule parsing in production
|
||||
rule_lower = rule_text.lower()
|
||||
if 'move to' in rule_lower:
|
||||
try:
|
||||
start_idx = rule_lower.find('move to') + 7 # length of 'move to'
|
||||
# Extract folder name (everything after 'move to' until end or punctuation)
|
||||
destination = rule_text[start_idx:].strip()
|
||||
# Remove any trailing punctuation
|
||||
destination = destination.rstrip('.!,;:')
|
||||
return destination.strip()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Default: no move needed
|
||||
return None
|
||||
|
||||
def _move_email(self, email_uid: str, source_folder: str, destination_folder: str) -> bool:
|
||||
"""
|
||||
Move an email from source folder to destination folder.
|
||||
|
||||
Args:
|
||||
email_uid: The UID of the email to move
|
||||
source_folder: The current folder name
|
||||
destination_folder: The destination folder name
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Copy email to destination folder
|
||||
result = self.imap_service.connection.copy(email_uid, destination_folder)
|
||||
if result[0] != 'OK':
|
||||
self.logger.error(f"Copy failed for email {email_uid}: {result[1]}")
|
||||
return False
|
||||
|
||||
# Store the message ID to ensure it was copied
|
||||
# (In a real implementation, you might want to verify the copy)
|
||||
|
||||
# Delete from source folder
|
||||
self.imap_service.connection.select(source_folder)
|
||||
result = self.imap_service.connection.store(email_uid, '+FLAGS', '\Deleted')
|
||||
if result[0] != 'OK':
|
||||
self.logger.error(f"Mark as deleted failed for email {email_uid}: {result[1]}")
|
||||
return False
|
||||
|
||||
# Expunge to permanently remove
|
||||
result = self.imap_service.connection.expunge()
|
||||
if result[0] != 'OK':
|
||||
self.logger.error(f"Expunge failed for email {email_uid}: {result[1]}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error moving email {email_uid} from {source_folder} to {destination_folder}: {str(e)}")
|
||||
return False
|
||||
|
||||
def _update_folder_counts(self, folder: Folder) -> None:
|
||||
"""
|
||||
Update folder counts after processing.
|
||||
|
||||
Args:
|
||||
folder: The folder to update
|
||||
"""
|
||||
try:
|
||||
# Update pending count
|
||||
pending_count = self.processed_emails_service.get_pending_count(folder.name)
|
||||
folder.pending_count = pending_count
|
||||
|
||||
# Update total count (get from IMAP)
|
||||
total_count = self.imap_service.get_folder_email_count(folder.name)
|
||||
folder.total_count = total_count
|
||||
|
||||
db.session.commit()
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error updating folder counts for {folder.name}: {str(e)}")
|
||||
db.session.rollback()
|
||||
@@ -7,6 +7,7 @@ from app.models import Folder
|
||||
from .routes.folders import folders_bp
|
||||
from .routes.imap import imap_bp
|
||||
from .routes.emails import emails_bp
|
||||
from .routes.background_processing import bp as background_processing_bp
|
||||
|
||||
# Create the main blueprint
|
||||
main = Blueprint('main', __name__)
|
||||
@@ -15,6 +16,7 @@ main = Blueprint('main', __name__)
|
||||
main.register_blueprint(folders_bp)
|
||||
main.register_blueprint(imap_bp)
|
||||
main.register_blueprint(emails_bp)
|
||||
main.register_blueprint(background_processing_bp)
|
||||
|
||||
# Root route that redirects to the main index page
|
||||
@main.route('/')
|
||||
|
||||
59
app/routes/background_processing.py
Normal file
59
app/routes/background_processing.py
Normal file
@@ -0,0 +1,59 @@
|
||||
from flask import Blueprint, jsonify, request
|
||||
from flask_login import login_required, current_user
|
||||
from app.email_processor import EmailProcessor
|
||||
|
||||
decorators = [login_required]
|
||||
|
||||
bp = Blueprint('background_processing', __name__, url_prefix='/api/background')
|
||||
|
||||
@bp.route('/process-emails', methods=['POST'])
|
||||
def trigger_email_processing():
|
||||
"""
|
||||
Trigger immediate email processing for the current user.
|
||||
"""
|
||||
try:
|
||||
processor = EmailProcessor(current_user)
|
||||
result = processor.process_user_emails()
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'message': f"Processed {result['success_count']} emails successfully",
|
||||
'details': result
|
||||
}), 200
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}), 500
|
||||
|
||||
@bp.route('/process-folder/<int:folder_id>', methods=['POST'])
|
||||
def trigger_folder_processing(folder_id):
|
||||
"""
|
||||
Trigger email processing for a specific folder.
|
||||
"""
|
||||
try:
|
||||
# Verify the folder belongs to the current user
|
||||
from app.models import Folder
|
||||
folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first()
|
||||
|
||||
if not folder:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': 'Folder not found or access denied'
|
||||
}), 404
|
||||
|
||||
processor = EmailProcessor(current_user)
|
||||
result = processor.process_folder_emails(folder)
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'message': f"Processed {result['processed_count']} emails for folder {folder.name}",
|
||||
'details': result
|
||||
}), 200
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}), 500
|
||||
107
app/scheduler.py
Normal file
107
app/scheduler.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from threading import Thread
|
||||
from app.models import User
|
||||
from app.email_processor import EmailProcessor
|
||||
from app import create_app
|
||||
|
||||
class Scheduler:
|
||||
"""
|
||||
Background scheduler for email processing tasks.
|
||||
Runs as a separate thread to process emails at regular intervals.
|
||||
"""
|
||||
|
||||
def __init__(self, app=None, interval_minutes=5):
|
||||
self.app = app
|
||||
self.interval = interval_minutes * 60 # Convert to seconds
|
||||
self.thread = None
|
||||
self.running = False
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
if app:
|
||||
self.init_app(app)
|
||||
|
||||
def init_app(self, app):
|
||||
"""Initialize the scheduler with a Flask application."""
|
||||
self.app = app
|
||||
|
||||
# Store scheduler in app extensions
|
||||
if not hasattr(app, 'extensions'):
|
||||
app.extensions = {}
|
||||
app.extensions['scheduler'] = self
|
||||
|
||||
def start(self):
|
||||
"""Start the background scheduler thread."""
|
||||
if self.running:
|
||||
self.logger.warning("Scheduler is already running")
|
||||
return
|
||||
|
||||
self.running = True
|
||||
self.thread = Thread(target=self._run, daemon=True)
|
||||
self.thread.start()
|
||||
self.logger.info(f"Scheduler started with {self.interval//60} minute interval")
|
||||
|
||||
def stop(self):
|
||||
"""Stop the background scheduler."""
|
||||
self.running = False
|
||||
if self.thread:
|
||||
self.logger.info("Scheduler stopped")
|
||||
self.thread.join()
|
||||
|
||||
def _run(self):
|
||||
"""Main loop for the scheduler."""
|
||||
# Wait a moment before first run to allow app to start
|
||||
time.sleep(5)
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
self.logger.info("Starting scheduled email processing")
|
||||
self.process_all_users()
|
||||
|
||||
# Calculate next run time
|
||||
next_run = datetime.now() + timedelta(seconds=self.interval)
|
||||
self.logger.info(f"Next run at {next_run.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
# Sleep for the interval, checking every 10 seconds if we should stop
|
||||
slept = 0
|
||||
while self.running and slept < self.interval:
|
||||
sleep_time = min(10, self.interval - slept)
|
||||
time.sleep(sleep_time)
|
||||
slept += sleep_time
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in scheduler: {str(e)}")
|
||||
# Continue running even if there's an error
|
||||
continue
|
||||
|
||||
def process_all_users(self):
|
||||
"""Process emails for all users."""
|
||||
with self.app.app_context():
|
||||
try:
|
||||
# Get all users
|
||||
users = User.query.all()
|
||||
|
||||
if not users:
|
||||
self.logger.info("No users found for processing")
|
||||
return
|
||||
|
||||
self.logger.info(f"Processing emails for {len(users)} users")
|
||||
|
||||
# Process each user's emails
|
||||
for user in users:
|
||||
try:
|
||||
processor = EmailProcessor(user)
|
||||
result = processor.process_user_emails()
|
||||
|
||||
self.logger.info(f"Completed processing for user {user.email}: "
|
||||
f"{result['success_count']} success, "
|
||||
f"{result['error_count']} errors, "
|
||||
f"{len(result['processed_folders'])} folders processed")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error processing emails for user {user.email}: {str(e)}")
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in process_all_users: {str(e)}")
|
||||
Reference in New Issue
Block a user