diff --git a/app/imap_service.py b/app/imap_service.py index 60b3347..c84266e 100644 --- a/app/imap_service.py +++ b/app/imap_service.py @@ -4,7 +4,8 @@ import logging from typing import List, Dict, Optional, Tuple from email.header import decode_header from email.utils import parsedate_to_datetime -from app.models import db, Folder, User +from app.models import db, Folder, User, ProcessedEmail +from app.processed_emails_service import ProcessedEmailsService from app import create_app class IMAPService: @@ -12,6 +13,7 @@ class IMAPService: self.user = user self.config = user.imap_config or {} self.connection = None + self.processed_emails_service = ProcessedEmailsService(user) def _connect(self): """Create an IMAP connection based on configuration.""" @@ -294,6 +296,14 @@ class IMAPService: self.connection = None return [] + def get_pending_emails(self, folder_name: str) -> List[str]: + """Get list of email UIDs that are pending processing in a folder.""" + try: + return self.processed_emails_service.get_pending_emails(folder_name) + except Exception as e: + logging.error(f"Error getting pending emails for folder {folder_name}: {str(e)}") + return [] + def get_email_headers(self, folder_name: str, email_uid: str) -> Dict[str, str]: """Get email headers for a specific email UID.""" try: @@ -348,7 +358,7 @@ class IMAPService: return {} def sync_folders(self) -> Tuple[bool, str]: - """Sync IMAP folders with local database.""" + """Sync IMAP folders with local database and track email processing status.""" try: if not self.config: return False, "No IMAP configuration found" @@ -401,15 +411,34 @@ class IMAPService: db.session.add(new_folder) synced_count += 1 else: - # Update existing folder with email counts and recent emails - # Get the total count of emails in this folder - total_count = self.get_folder_email_count(folder_name) - existing_folder.total_count = total_count - existing_folder.pending_count = 0 # Initially set to 0 + # Get the list of email UIDs in this folder + email_uids = self.get_folder_email_uids(folder_name) + + # Sync with processed emails service to track which emails are pending + new_uids_count = self.processed_emails_service.sync_folder_emails( + folder_name=folder_name, + email_uids=email_uids + ) + + # Update folder with email counts + existing_folder.total_count = len(email_uids) + + # Update pending count based on processed emails service + pending_count = self.processed_emails_service.get_pending_count(folder_name) + existing_folder.pending_count = pending_count # Get the most recent emails for this folder recent_emails = self.get_recent_emails(folder_name, 3) existing_folder.recent_emails = recent_emails + + # Clean up records for emails that no longer exist + if new_uids_count > 0: + deleted_count = self.processed_emails_service.cleanup_old_records( + folder_name=folder_name, + current_uids=email_uids + ) + if deleted_count > 0: + print(f" - Cleaned up {deleted_count} old email records") db.session.commit() return True, f"Successfully synced {synced_count} folders" diff --git a/manage.py b/manage.py index 643ec54..d2860c0 100644 --- a/manage.py +++ b/manage.py @@ -2,7 +2,9 @@ import sys import os import subprocess from app import create_app, db -from app.models import Folder, User +from app.models import Folder, User, ProcessedEmail +from app.imap_service import IMAPService +from app.processed_emails_service import ProcessedEmailsService from flask.cli import with_appcontext import click @@ -65,6 +67,111 @@ def mock_process_emails(): if not matched: print(" -> No matching rule found.") + +@app.cli.command("process-pending-emails") +@click.option("--user-id", help="Process emails for a specific user ID only") +@click.option("--folder-name", help="Process emails for a specific folder only") +@click.option("--batch-size", default=100, help="Number of emails to process in each batch") +@click.option("--dry-run", is_flag=True, help="Show what would be processed without actually processing") +@with_appcontext +def process_pending_emails(user_id, folder_name, batch_size, dry_run): + """Process pending emails for all users or specific user/folder.""" + from app.processed_emails_service import ProcessedEmailsService + + with app.app_context(): + # Build query for users + user_query = User.query + if user_id: + user_query = user_query.filter_by(id=int(user_id)) + + users = user_query.all() + if not users: + print("No users found to process") + return + + total_processed = 0 + total_users = len(users) + + print(f"Starting email processing for {total_users} user(s)...") + + for i, user in enumerate(users, 1): + print(f"\nProcessing user {i}/{total_users}: {user.email}") + + # Skip users without IMAP configuration + if not user.imap_config: + print(" - Skipping: No IMAP configuration") + continue + + try: + # Initialize services + imap_service = IMAPService(user) + processed_emails_service = ProcessedEmailsService(user) + + # Get folders for this user + folders = Folder.query.filter_by(user_id=user.id).all() + + if not folders: + print(" - No folders found for this user") + continue + + # Process each folder + for folder in folders: + # Skip if specific folder name is specified and doesn't match + if folder_name and folder.name != folder_name: + continue + + print(f" - Processing folder: {folder.name}") + + # Get all pending emails for this folder + pending_emails = ProcessedEmail.query.filter_by( + user_id=user.id, + folder_name=folder.name, + is_processed=False + ).all() + + if not pending_emails: + print(f" - No pending emails in this folder") + continue + + total_pending = len(pending_emails) + print(f" - Found {total_pending} pending emails") + + # Process in batches + processed_in_folder = 0 + for j in range(0, total_pending, batch_size): + batch = pending_emails[j:j + batch_size] + batch_uids = [email.email_uid for email in batch] + + if dry_run: + print(f" - Dry run: Would process {len(batch_uids)} emails") + processed_in_folder += len(batch_uids) + else: + # Mark emails as processed + result = processed_emails_service.mark_emails_processed( + folder_name=folder.name, + email_uids=batch_uids + ) + processed_in_folder += result + print(f" - Processed {result} emails in this batch") + + # Commit after each batch to avoid large transactions + db.session.commit() + + print(f" - Total processed in folder: {processed_in_folder}/{total_pending}") + total_processed += processed_in_folder + + except Exception as e: + print(f" - Error processing user {user.email}: {str(e)}") + import traceback + traceback.print_exc() + db.session.rollback() + continue + + if not dry_run: + print(f"\nCompleted! Total emails processed: {total_processed}") + else: + print(f"\nDry run completed! Total emails would be processed: {total_processed}") + if __name__ == '__main__': if len(sys.argv) > 1 and sys.argv[1] == 'mock-process': mock_process_emails()