from typing import List, Dict, Optional from datetime import datetime, timedelta import logging import requests from app.models import db, Folder, User, ProcessedEmail from app.imap_service import IMAPService from app.processed_emails_service import ProcessedEmailsService from app.prompt_templates import build_destination_prompt class EmailProcessor: """ Service class for processing emails in the background according to user-defined rules. Handles automated organization of emails based on folder configurations. """ def __init__(self, user: User): self.user = user self.imap_service = IMAPService(user) self.processed_emails_service = ProcessedEmailsService(user) self.logger = logging.getLogger(__name__) def process_user_emails(self) -> Dict[str, any]: """ Process emails for a user according to their folder rules. Returns: Dictionary with processing results including success count, error count, and timing """ result = { 'success_count': 0, 'error_count': 0, 'processed_folders': [], 'start_time': datetime.utcnow(), 'end_time': None, 'duration': None } try: # Get all folders with organize_enabled = True, ordered by priority (highest first) folders = Folder.query.filter_by( user_id=self.user.id, organize_enabled=True ).order_by(Folder.priority.desc()).all() if not folders: self.logger.info(f"No folders to process for user {self.user.email}") result['end_time'] = datetime.utcnow() result['duration'] = (result['end_time'] - result['start_time']).total_seconds() return result self.logger.info(f"Processing {len(folders)} folders for user {self.user.email}") # Process each folder according to priority for folder in folders: try: folder_result = self.process_folder_emails(folder) result['success_count'] += folder_result['processed_count'] result['error_count'] += folder_result['error_count'] result['processed_folders'].append({ 'folder_id': folder.id, 'folder_name': folder.name, 'processed_count': folder_result['processed_count'], 'error_count': folder_result['error_count'] }) self.logger.info(f"Processed {folder_result['processed_count']} emails for folder {folder.name}") except Exception as e: self.logger.error(f"Error processing folder {folder.name}: {str(e)}") result['error_count'] += 1 continue except Exception as e: self.logger.error(f"Error in process_user_emails for user {self.user.email}: {str(e)}") result['error_count'] += 1 finally: result['end_time'] = datetime.utcnow() result['duration'] = (result['end_time'] - result['start_time']).total_seconds() return result def process_folder_emails(self, folder: Folder) -> Dict[str, any]: """ Process emails for a specific folder according to its rules. Args: folder: The folder to process Returns: Dictionary with processing results for this folder """ result = { 'processed_count': 0, 'error_count': 0, 'folder_id': folder.id, 'folder_name': folder.name } try: # Get pending emails for this folder pending_email_uids = self.processed_emails_service.get_pending_emails(folder.name) if not pending_email_uids: self.logger.info(f"No pending emails to process for folder {folder.name}") return result self.logger.info(f"Processing {len(pending_email_uids)} pending emails for folder {folder.name}") # Process emails in batches to manage system resources batch_size = 10 # Configurable batch size processed_batch_count = 0 for i in range(0, len(pending_email_uids), batch_size): batch = pending_email_uids[i:i + batch_size] batch_result = self._process_email_batch(folder, batch) result['processed_count'] += batch_result['processed_count'] result['error_count'] += batch_result['error_count'] processed_batch_count += 1 self.logger.info(f"Processed batch {i//batch_size + 1} for folder {folder.name}: {batch_result['processed_count']} success, {batch_result['error_count']} errors") # Update folder pending count after processing self._update_folder_counts(folder) except Exception as e: self.logger.error(f"Error in process_folder_emails for folder {folder.name}: {str(e)}") result['error_count'] += 1 return result def _process_email_batch(self, folder: Folder, email_uids: List[str]) -> Dict[str, any]: """ Process a batch of emails for a folder. Args: folder: The folder whose rules to apply email_uids: List of email UIDs to process Returns: Dictionary with processing results for this batch """ result = { 'processed_count': 0, 'error_count': 0 } try: # Connect to IMAP server self.imap_service._connect() # Login username = self.user.imap_config.get('username', '') password = self.user.imap_config.get('password', '') self.imap_service.connection.login(username, password) # Select the source folder resp_code, content = self.imap_service.connection.select(folder.name) if resp_code != 'OK': raise Exception(f"Failed to select folder {folder.name}: {content}") # Get all enabled folders for this user (for AI context) all_folders = Folder.query.filter_by(user_id=self.user.id, organize_enabled=True).all() rules = [ { 'name': f.name, 'rule_text': f.rule_text, 'priority': f.priority } for f in all_folders ] # Get email headers for all emails in batch emails = [] valid_uids = [] # Track which UIDs we successfully fetched headers for for email_uid in email_uids: try: headers = self.imap_service.get_email_headers(folder.name, email_uid) if headers: emails.append({ 'uid': email_uid, 'headers': headers }) valid_uids.append(email_uid) else: self.logger.warning(f"Could not get headers for email {email_uid} in folder {folder.name}") result['error_count'] += 1 except Exception as e: self.logger.error(f"Error getting headers for email {email_uid}: {str(e)}") result['error_count'] += 1 # Skip AI call if no valid emails if not emails: return result # Get destinations from AI destinations = self.get_email_destinations(emails, rules) # Process each email based on AI decision processed_uids = [] for email in emails: try: email_uid = email['uid'] destination_folder = destinations.get(email_uid) if not destination_folder: self.logger.warning(f"No destination determined for email {email_uid}") result['error_count'] += 1 continue # Skip if destination is same as current folder or INBOX (no move needed) if destination_folder.lower() == 'inbox' or destination_folder == folder.name: processed_uids.append(email_uid) result['processed_count'] += 1 continue # Move email to destination folder if self._move_email(email_uid, folder.name, destination_folder): processed_uids.append(email_uid) result['processed_count'] += 1 else: self.logger.error(f"Failed to move email {email_uid} from {folder.name} to {destination_folder}") result['error_count'] += 1 except Exception as e: self.logger.error(f"Error processing email {email_uid}: {str(e)}") result['error_count'] += 1 continue # Mark processed emails in the database if processed_uids: count = self.processed_emails_service.mark_emails_processed(folder.name, processed_uids) if count != len(processed_uids): self.logger.warning(f"Marked {count} emails as processed, but expected {len(processed_uids)}") # Close folder and logout self.imap_service.connection.close() self.imap_service.connection.logout() self.imap_service.connection = None except Exception as e: self.logger.error(f"Error in _process_email_batch: {str(e)}") result['error_count'] += 1 # Clean up connection if needed if self.imap_service.connection: try: self.imap_service.connection.close() self.imap_service.connection.logout() except: pass self.imap_service.connection = None return result def get_email_destinations(self, emails: List[Dict], rules: List[Dict]) -> Dict[str, str]: """ Get destination folders for a batch of emails using AI completion API. Args: emails: List of email dictionaries containing uid and headers rules: List of folder rules with name, rule_text, and priority Returns: Dictionary mapping email UID to destination folder name """ destinations = {} # Get API configuration from environment or config api_url = self.user.get_setting('OPENAI_BASE_URL', 'http://localhost:1234/v1/completions') api_key = self.user.get_setting('OPENAI_API_KEY', 'dummy-key') headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}' } for email in emails: try: # Build prompt using template prompt = build_destination_prompt(email['headers'], rules) payload = { 'prompt': prompt, 'temperature': 0.7, 'max_tokens': 50 } response = requests.post(f'{api_url}', json=payload, headers=headers, timeout=30) if response.status_code == 200: # Extract folder name from response (should be just the folder name) destination = response.text.strip() destinations[email['uid']] = destination else: self.logger.error(f"AI API request failed: {response.status_code} - {response.text}") # On error, don't assign a destination (will be handled by caller) except Exception as e: self.logger.error(f"Error calling AI API for email {email['uid']}: {str(e)}") # Continue with other emails continue return destinations def _move_email(self, email_uid: str, source_folder: str, destination_folder: str) -> bool: """ Move an email from source folder to destination folder. Args: email_uid: The UID of the email to move source_folder: The current folder name destination_folder: The destination folder name Returns: True if successful, False otherwise """ try: # Copy email to destination folder result = self.imap_service.connection.copy(email_uid, destination_folder) if result[0] != 'OK': self.logger.error(f"Copy failed for email {email_uid}: {result[1]}") return False # Store the message ID to ensure it was copied # (In a real implementation, you might want to verify the copy) # Delete from source folder self.imap_service.connection.select(source_folder) result = self.imap_service.connection.store(email_uid, '+FLAGS', '\Deleted') if result[0] != 'OK': self.logger.error(f"Mark as deleted failed for email {email_uid}: {result[1]}") return False # Expunge to permanently remove result = self.imap_service.connection.expunge() if result[0] != 'OK': self.logger.error(f"Expunge failed for email {email_uid}: {result[1]}") return False return True except Exception as e: self.logger.error(f"Error moving email {email_uid} from {source_folder} to {destination_folder}: {str(e)}") return False def _update_folder_counts(self, folder: Folder) -> None: """ Update folder counts after processing. Args: folder: The folder to update """ try: # Update pending count pending_count = self.processed_emails_service.get_pending_count(folder.name) folder.pending_count = pending_count # Update total count (get from IMAP) total_count = self.imap_service.get_folder_email_count(folder.name) folder.total_count = total_count db.session.commit() except Exception as e: self.logger.error(f"Error updating folder counts for {folder.name}: {str(e)}") db.session.rollback()