from typing import List, Dict, Optional from datetime import datetime, timedelta import logging from app.models import db, Folder, User, ProcessedEmail from app.imap_service import IMAPService from app.processed_emails_service import ProcessedEmailsService class EmailProcessor: """ Service class for processing emails in the background according to user-defined rules. Handles automated organization of emails based on folder configurations. """ def __init__(self, user: User): self.user = user self.imap_service = IMAPService(user) self.processed_emails_service = ProcessedEmailsService(user) self.logger = logging.getLogger(__name__) def process_user_emails(self) -> Dict[str, any]: """ Process emails for a user according to their folder rules. Returns: Dictionary with processing results including success count, error count, and timing """ result = { 'success_count': 0, 'error_count': 0, 'processed_folders': [], 'start_time': datetime.utcnow(), 'end_time': None, 'duration': None } try: # Get all folders with organize_enabled = True, ordered by priority (highest first) folders = Folder.query.filter_by( user_id=self.user.id, organize_enabled=True ).order_by(Folder.priority.desc()).all() if not folders: self.logger.info(f"No folders to process for user {self.user.email}") result['end_time'] = datetime.utcnow() result['duration'] = (result['end_time'] - result['start_time']).total_seconds() return result self.logger.info(f"Processing {len(folders)} folders for user {self.user.email}") # Process each folder according to priority for folder in folders: try: folder_result = self.process_folder_emails(folder) result['success_count'] += folder_result['processed_count'] result['error_count'] += folder_result['error_count'] result['processed_folders'].append({ 'folder_id': folder.id, 'folder_name': folder.name, 'processed_count': folder_result['processed_count'], 'error_count': folder_result['error_count'] }) self.logger.info(f"Processed {folder_result['processed_count']} emails for folder {folder.name}") except Exception as e: self.logger.error(f"Error processing folder {folder.name}: {str(e)}") result['error_count'] += 1 continue except Exception as e: self.logger.error(f"Error in process_user_emails for user {self.user.email}: {str(e)}") result['error_count'] += 1 finally: result['end_time'] = datetime.utcnow() result['duration'] = (result['end_time'] - result['start_time']).total_seconds() return result def process_folder_emails(self, folder: Folder) -> Dict[str, any]: """ Process emails for a specific folder according to its rules. Args: folder: The folder to process Returns: Dictionary with processing results for this folder """ result = { 'processed_count': 0, 'error_count': 0, 'folder_id': folder.id, 'folder_name': folder.name } try: # Get pending emails for this folder pending_email_uids = self.processed_emails_service.get_pending_emails(folder.name) if not pending_email_uids: self.logger.info(f"No pending emails to process for folder {folder.name}") return result self.logger.info(f"Processing {len(pending_email_uids)} pending emails for folder {folder.name}") # Process emails in batches to manage system resources batch_size = 10 # Configurable batch size processed_batch_count = 0 for i in range(0, len(pending_email_uids), batch_size): batch = pending_email_uids[i:i + batch_size] batch_result = self._process_email_batch(folder, batch) result['processed_count'] += batch_result['processed_count'] result['error_count'] += batch_result['error_count'] processed_batch_count += 1 self.logger.info(f"Processed batch {i//batch_size + 1} for folder {folder.name}: {batch_result['processed_count']} success, {batch_result['error_count']} errors") # Update folder pending count after processing self._update_folder_counts(folder) except Exception as e: self.logger.error(f"Error in process_folder_emails for folder {folder.name}: {str(e)}") result['error_count'] += 1 return result def _process_email_batch(self, folder: Folder, email_uids: List[str]) -> Dict[str, any]: """ Process a batch of emails for a folder. Args: folder: The folder whose rules to apply email_uids: List of email UIDs to process Returns: Dictionary with processing results for this batch """ result = { 'processed_count': 0, 'error_count': 0 } try: # Connect to IMAP server self.imap_service._connect() # Login username = self.user.imap_config.get('username', '') password = self.user.imap_config.get('password', '') self.imap_service.connection.login(username, password) # Select the source folder resp_code, content = self.imap_service.connection.select(folder.name) if resp_code != 'OK': raise Exception(f"Failed to select folder {folder.name}: {content}") # Process each email in the batch processed_uids = [] for email_uid in email_uids: try: # Get email headers to evaluate rules headers = self.imap_service.get_email_headers(folder.name, email_uid) if not headers: self.logger.warning(f"Could not get headers for email {email_uid} in folder {folder.name}") result['error_count'] += 1 continue # Evaluate rules to determine destination destination_folder = self._evaluate_rules(headers, folder.rule_text) if destination_folder and destination_folder != folder.name: # Move email to destination folder if self._move_email(email_uid, folder.name, destination_folder): processed_uids.append(email_uid) result['processed_count'] += 1 else: self.logger.error(f"Failed to move email {email_uid} from {folder.name} to {destination_folder}") result['error_count'] += 1 else: # Mark as processed (no move needed) processed_uids.append(email_uid) result['processed_count'] += 1 except Exception as e: self.logger.error(f"Error processing email {email_uid}: {str(e)}") result['error_count'] += 1 continue # Mark processed emails in the database if processed_uids: count = self.processed_emails_service.mark_emails_processed(folder.name, processed_uids) if count != len(processed_uids): self.logger.warning(f"Marked {count} emails as processed, but expected {len(processed_uids)}") # Close folder and logout self.imap_service.connection.close() self.imap_service.connection.logout() self.imap_service.connection = None except Exception as e: self.logger.error(f"Error in _process_email_batch: {str(e)}") result['error_count'] += 1 # Clean up connection if needed if self.imap_service.connection: try: self.imap_service.connection.close() self.imap_service.connection.logout() except: pass self.imap_service.connection = None return result def _evaluate_rules(self, headers: Dict[str, str], rule_text: Optional[str]) -> Optional[str]: """ Evaluate rules against email headers to determine destination folder. Args: headers: Email headers (subject, from, to, date) rule_text: Rule text defining processing criteria Returns: Destination folder name or None if no move is needed """ # This is a simplified implementation - in a real app, this would parse # the rule_text and evaluate it against the headers # For now, we'll implement a simple rule evaluation if not rule_text: return None # Simple example: if rule contains 'move to', extract destination # This should be replaced with proper rule parsing in production rule_lower = rule_text.lower() if 'move to' in rule_lower: try: start_idx = rule_lower.find('move to') + 7 # length of 'move to' # Extract folder name (everything after 'move to' until end or punctuation) destination = rule_text[start_idx:].strip() # Remove any trailing punctuation destination = destination.rstrip('.!,;:') return destination.strip() except: pass # Default: no move needed return None def _move_email(self, email_uid: str, source_folder: str, destination_folder: str) -> bool: """ Move an email from source folder to destination folder. Args: email_uid: The UID of the email to move source_folder: The current folder name destination_folder: The destination folder name Returns: True if successful, False otherwise """ try: # Copy email to destination folder result = self.imap_service.connection.copy(email_uid, destination_folder) if result[0] != 'OK': self.logger.error(f"Copy failed for email {email_uid}: {result[1]}") return False # Store the message ID to ensure it was copied # (In a real implementation, you might want to verify the copy) # Delete from source folder self.imap_service.connection.select(source_folder) result = self.imap_service.connection.store(email_uid, '+FLAGS', '\Deleted') if result[0] != 'OK': self.logger.error(f"Mark as deleted failed for email {email_uid}: {result[1]}") return False # Expunge to permanently remove result = self.imap_service.connection.expunge() if result[0] != 'OK': self.logger.error(f"Expunge failed for email {email_uid}: {result[1]}") return False return True except Exception as e: self.logger.error(f"Error moving email {email_uid} from {source_folder} to {destination_folder}: {str(e)}") return False def _update_folder_counts(self, folder: Folder) -> None: """ Update folder counts after processing. Args: folder: The folder to update """ try: # Update pending count pending_count = self.processed_emails_service.get_pending_count(folder.name) folder.pending_count = pending_count # Update total count (get from IMAP) total_count = self.imap_service.get_folder_email_count(folder.name) folder.total_count = total_count db.session.commit() except Exception as e: self.logger.error(f"Error updating folder counts for {folder.name}: {str(e)}") db.session.rollback()