369 lines
15 KiB
Python
369 lines
15 KiB
Python
from typing import List, Dict, Optional
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
import requests
|
|
from app.models import db, Folder, User, ProcessedEmail
|
|
from app.imap_service import IMAPService
|
|
from app.processed_emails_service import ProcessedEmailsService
|
|
from app.prompt_templates import build_destination_prompt
|
|
|
|
|
|
class EmailProcessor:
|
|
"""
|
|
Service class for processing emails in the background according to user-defined rules.
|
|
Handles automated organization of emails based on folder configurations.
|
|
"""
|
|
|
|
def __init__(self, user: User):
|
|
self.user = user
|
|
self.imap_service = IMAPService(user)
|
|
self.processed_emails_service = ProcessedEmailsService(user)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def process_user_emails(self) -> Dict[str, any]:
|
|
"""
|
|
Process emails for a user according to their folder rules.
|
|
|
|
Returns:
|
|
Dictionary with processing results including success count, error count, and timing
|
|
"""
|
|
result = {
|
|
'success_count': 0,
|
|
'error_count': 0,
|
|
'processed_folders': [],
|
|
'start_time': datetime.utcnow(),
|
|
'end_time': None,
|
|
'duration': None
|
|
}
|
|
|
|
try:
|
|
# Get all folders with organize_enabled = True, ordered by priority (highest first)
|
|
folders = Folder.query.filter_by(
|
|
user_id=self.user.id,
|
|
organize_enabled=True
|
|
).order_by(Folder.priority.desc()).all()
|
|
|
|
if not folders:
|
|
self.logger.info(f"No folders to process for user {self.user.email}")
|
|
result['end_time'] = datetime.utcnow()
|
|
result['duration'] = (result['end_time'] - result['start_time']).total_seconds()
|
|
return result
|
|
|
|
self.logger.info(f"Processing {len(folders)} folders for user {self.user.email}")
|
|
|
|
# Process each folder according to priority
|
|
for folder in folders:
|
|
try:
|
|
folder_result = self.process_folder_emails(folder)
|
|
result['success_count'] += folder_result['processed_count']
|
|
result['error_count'] += folder_result['error_count']
|
|
result['processed_folders'].append({
|
|
'folder_id': folder.id,
|
|
'folder_name': folder.name,
|
|
'processed_count': folder_result['processed_count'],
|
|
'error_count': folder_result['error_count']
|
|
})
|
|
|
|
self.logger.info(f"Processed {folder_result['processed_count']} emails for folder {folder.name}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing folder {folder.name}: {str(e)}")
|
|
result['error_count'] += 1
|
|
continue
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in process_user_emails for user {self.user.email}: {str(e)}")
|
|
result['error_count'] += 1
|
|
|
|
finally:
|
|
result['end_time'] = datetime.utcnow()
|
|
result['duration'] = (result['end_time'] - result['start_time']).total_seconds()
|
|
|
|
return result
|
|
|
|
def process_folder_emails(self, folder: Folder) -> Dict[str, any]:
|
|
"""
|
|
Process emails for a specific folder according to its rules.
|
|
|
|
Args:
|
|
folder: The folder to process
|
|
|
|
Returns:
|
|
Dictionary with processing results for this folder
|
|
"""
|
|
result = {
|
|
'processed_count': 0,
|
|
'error_count': 0,
|
|
'folder_id': folder.id,
|
|
'folder_name': folder.name
|
|
}
|
|
|
|
try:
|
|
# Get pending emails for this folder
|
|
pending_email_uids = self.processed_emails_service.get_pending_emails(folder.name)
|
|
|
|
if not pending_email_uids:
|
|
self.logger.info(f"No pending emails to process for folder {folder.name}")
|
|
return result
|
|
|
|
self.logger.info(f"Processing {len(pending_email_uids)} pending emails for folder {folder.name}")
|
|
|
|
# Process emails in batches to manage system resources
|
|
batch_size = 10 # Configurable batch size
|
|
processed_batch_count = 0
|
|
|
|
for i in range(0, len(pending_email_uids), batch_size):
|
|
batch = pending_email_uids[i:i + batch_size]
|
|
batch_result = self._process_email_batch(folder, batch)
|
|
|
|
result['processed_count'] += batch_result['processed_count']
|
|
result['error_count'] += batch_result['error_count']
|
|
processed_batch_count += 1
|
|
|
|
self.logger.info(f"Processed batch {i//batch_size + 1} for folder {folder.name}: {batch_result['processed_count']} success, {batch_result['error_count']} errors")
|
|
|
|
# Update folder pending count after processing
|
|
self._update_folder_counts(folder)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in process_folder_emails for folder {folder.name}: {str(e)}")
|
|
result['error_count'] += 1
|
|
|
|
return result
|
|
|
|
def _process_email_batch(self, folder: Folder, email_uids: List[str]) -> Dict[str, any]:
|
|
"""
|
|
Process a batch of emails for a folder.
|
|
|
|
Args:
|
|
folder: The folder whose rules to apply
|
|
email_uids: List of email UIDs to process
|
|
|
|
Returns:
|
|
Dictionary with processing results for this batch
|
|
"""
|
|
result = {
|
|
'processed_count': 0,
|
|
'error_count': 0
|
|
}
|
|
|
|
try:
|
|
# Connect to IMAP server
|
|
self.imap_service._connect()
|
|
|
|
# Login
|
|
username = self.user.imap_config.get('username', '')
|
|
password = self.user.imap_config.get('password', '')
|
|
self.imap_service.connection.login(username, password)
|
|
|
|
# Select the source folder
|
|
resp_code, content = self.imap_service.connection.select(folder.name)
|
|
if resp_code != 'OK':
|
|
raise Exception(f"Failed to select folder {folder.name}: {content}")
|
|
|
|
# Get all enabled folders for this user (for AI context)
|
|
all_folders = Folder.query.filter_by(user_id=self.user.id, organize_enabled=True).all()
|
|
rules = [
|
|
{
|
|
'name': f.name,
|
|
'rule_text': f.rule_text,
|
|
'priority': f.priority
|
|
}
|
|
for f in all_folders
|
|
]
|
|
|
|
# Get email headers for all emails in batch
|
|
emails = []
|
|
valid_uids = [] # Track which UIDs we successfully fetched headers for
|
|
for email_uid in email_uids:
|
|
try:
|
|
headers = self.imap_service.get_email_headers(folder.name, email_uid)
|
|
if headers:
|
|
emails.append({
|
|
'uid': email_uid,
|
|
'headers': headers
|
|
})
|
|
valid_uids.append(email_uid)
|
|
else:
|
|
self.logger.warning(f"Could not get headers for email {email_uid} in folder {folder.name}")
|
|
result['error_count'] += 1
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting headers for email {email_uid}: {str(e)}")
|
|
result['error_count'] += 1
|
|
|
|
# Skip AI call if no valid emails
|
|
if not emails:
|
|
return result
|
|
|
|
# Get destinations from AI
|
|
destinations = self.get_email_destinations(emails, rules)
|
|
|
|
# Process each email based on AI decision
|
|
processed_uids = []
|
|
for email in emails:
|
|
try:
|
|
email_uid = email['uid']
|
|
destination_folder = destinations.get(email_uid)
|
|
|
|
if not destination_folder:
|
|
self.logger.warning(f"No destination determined for email {email_uid}")
|
|
result['error_count'] += 1
|
|
continue
|
|
|
|
# Skip if destination is same as current folder or INBOX (no move needed)
|
|
if destination_folder.lower() == 'inbox' or destination_folder == folder.name:
|
|
processed_uids.append(email_uid)
|
|
result['processed_count'] += 1
|
|
continue
|
|
|
|
# Move email to destination folder
|
|
if self._move_email(email_uid, folder.name, destination_folder):
|
|
processed_uids.append(email_uid)
|
|
result['processed_count'] += 1
|
|
else:
|
|
self.logger.error(f"Failed to move email {email_uid} from {folder.name} to {destination_folder}")
|
|
result['error_count'] += 1
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing email {email_uid}: {str(e)}")
|
|
result['error_count'] += 1
|
|
continue
|
|
|
|
# Mark processed emails in the database
|
|
if processed_uids:
|
|
count = self.processed_emails_service.mark_emails_processed(folder.name, processed_uids)
|
|
if count != len(processed_uids):
|
|
self.logger.warning(f"Marked {count} emails as processed, but expected {len(processed_uids)}")
|
|
|
|
# Close folder and logout
|
|
self.imap_service.connection.close()
|
|
self.imap_service.connection.logout()
|
|
self.imap_service.connection = None
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in _process_email_batch: {str(e)}")
|
|
result['error_count'] += 1
|
|
|
|
# Clean up connection if needed
|
|
if self.imap_service.connection:
|
|
try:
|
|
self.imap_service.connection.close()
|
|
self.imap_service.connection.logout()
|
|
except:
|
|
pass
|
|
self.imap_service.connection = None
|
|
|
|
return result
|
|
|
|
def get_email_destinations(self, emails: List[Dict], rules: List[Dict]) -> Dict[str, str]:
|
|
"""
|
|
Get destination folders for a batch of emails using AI completion API.
|
|
|
|
Args:
|
|
emails: List of email dictionaries containing uid and headers
|
|
rules: List of folder rules with name, rule_text, and priority
|
|
|
|
Returns:
|
|
Dictionary mapping email UID to destination folder name
|
|
"""
|
|
destinations = {}
|
|
|
|
# Get API configuration from environment or config
|
|
api_url = self.user.get_setting('OPENAI_BASE_URL', 'http://localhost:1234/v1/completions')
|
|
api_key = self.user.get_setting('OPENAI_API_KEY', 'dummy-key')
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {api_key}'
|
|
}
|
|
|
|
for email in emails:
|
|
try:
|
|
# Build prompt using template
|
|
prompt = build_destination_prompt(email['headers'], rules)
|
|
|
|
payload = {
|
|
'prompt': prompt,
|
|
'temperature': 0.7,
|
|
'max_tokens': 50
|
|
}
|
|
|
|
response = requests.post(f'{api_url}', json=payload, headers=headers, timeout=30)
|
|
|
|
if response.status_code == 200:
|
|
# Extract folder name from response (should be just the folder name)
|
|
destination = response.text.strip()
|
|
destinations[email['uid']] = destination
|
|
else:
|
|
self.logger.error(f"AI API request failed: {response.status_code} - {response.text}")
|
|
# On error, don't assign a destination (will be handled by caller)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error calling AI API for email {email['uid']}: {str(e)}")
|
|
# Continue with other emails
|
|
continue
|
|
|
|
return destinations
|
|
|
|
def _move_email(self, email_uid: str, source_folder: str, destination_folder: str) -> bool:
|
|
"""
|
|
Move an email from source folder to destination folder.
|
|
|
|
Args:
|
|
email_uid: The UID of the email to move
|
|
source_folder: The current folder name
|
|
destination_folder: The destination folder name
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
# Copy email to destination folder
|
|
result = self.imap_service.connection.copy(email_uid, destination_folder)
|
|
if result[0] != 'OK':
|
|
self.logger.error(f"Copy failed for email {email_uid}: {result[1]}")
|
|
return False
|
|
|
|
# Store the message ID to ensure it was copied
|
|
# (In a real implementation, you might want to verify the copy)
|
|
|
|
# Delete from source folder
|
|
self.imap_service.connection.select(source_folder)
|
|
result = self.imap_service.connection.store(email_uid, '+FLAGS', '\Deleted')
|
|
if result[0] != 'OK':
|
|
self.logger.error(f"Mark as deleted failed for email {email_uid}: {result[1]}")
|
|
return False
|
|
|
|
# Expunge to permanently remove
|
|
result = self.imap_service.connection.expunge()
|
|
if result[0] != 'OK':
|
|
self.logger.error(f"Expunge failed for email {email_uid}: {result[1]}")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error moving email {email_uid} from {source_folder} to {destination_folder}: {str(e)}")
|
|
return False
|
|
|
|
def _update_folder_counts(self, folder: Folder) -> None:
|
|
"""
|
|
Update folder counts after processing.
|
|
|
|
Args:
|
|
folder: The folder to update
|
|
"""
|
|
try:
|
|
# Update pending count
|
|
pending_count = self.processed_emails_service.get_pending_count(folder.name)
|
|
folder.pending_count = pending_count
|
|
|
|
# Update total count (get from IMAP)
|
|
total_count = self.imap_service.get_folder_email_count(folder.name)
|
|
folder.total_count = total_count
|
|
|
|
db.session.commit()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating folder counts for {folder.name}: {str(e)}")
|
|
db.session.rollback() |