scaffolding for processing
This commit is contained in:
@@ -4,7 +4,8 @@ import logging
|
|||||||
from typing import List, Dict, Optional, Tuple
|
from typing import List, Dict, Optional, Tuple
|
||||||
from email.header import decode_header
|
from email.header import decode_header
|
||||||
from email.utils import parsedate_to_datetime
|
from email.utils import parsedate_to_datetime
|
||||||
from app.models import db, Folder, User
|
from app.models import db, Folder, User, ProcessedEmail
|
||||||
|
from app.processed_emails_service import ProcessedEmailsService
|
||||||
from app import create_app
|
from app import create_app
|
||||||
|
|
||||||
class IMAPService:
|
class IMAPService:
|
||||||
@@ -12,6 +13,7 @@ class IMAPService:
|
|||||||
self.user = user
|
self.user = user
|
||||||
self.config = user.imap_config or {}
|
self.config = user.imap_config or {}
|
||||||
self.connection = None
|
self.connection = None
|
||||||
|
self.processed_emails_service = ProcessedEmailsService(user)
|
||||||
|
|
||||||
def _connect(self):
|
def _connect(self):
|
||||||
"""Create an IMAP connection based on configuration."""
|
"""Create an IMAP connection based on configuration."""
|
||||||
@@ -294,6 +296,14 @@ class IMAPService:
|
|||||||
self.connection = None
|
self.connection = None
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
def get_pending_emails(self, folder_name: str) -> List[str]:
|
||||||
|
"""Get list of email UIDs that are pending processing in a folder."""
|
||||||
|
try:
|
||||||
|
return self.processed_emails_service.get_pending_emails(folder_name)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error getting pending emails for folder {folder_name}: {str(e)}")
|
||||||
|
return []
|
||||||
|
|
||||||
def get_email_headers(self, folder_name: str, email_uid: str) -> Dict[str, str]:
|
def get_email_headers(self, folder_name: str, email_uid: str) -> Dict[str, str]:
|
||||||
"""Get email headers for a specific email UID."""
|
"""Get email headers for a specific email UID."""
|
||||||
try:
|
try:
|
||||||
@@ -348,7 +358,7 @@ class IMAPService:
|
|||||||
return {}
|
return {}
|
||||||
|
|
||||||
def sync_folders(self) -> Tuple[bool, str]:
|
def sync_folders(self) -> Tuple[bool, str]:
|
||||||
"""Sync IMAP folders with local database."""
|
"""Sync IMAP folders with local database and track email processing status."""
|
||||||
try:
|
try:
|
||||||
if not self.config:
|
if not self.config:
|
||||||
return False, "No IMAP configuration found"
|
return False, "No IMAP configuration found"
|
||||||
@@ -401,15 +411,34 @@ class IMAPService:
|
|||||||
db.session.add(new_folder)
|
db.session.add(new_folder)
|
||||||
synced_count += 1
|
synced_count += 1
|
||||||
else:
|
else:
|
||||||
# Update existing folder with email counts and recent emails
|
# Get the list of email UIDs in this folder
|
||||||
# Get the total count of emails in this folder
|
email_uids = self.get_folder_email_uids(folder_name)
|
||||||
total_count = self.get_folder_email_count(folder_name)
|
|
||||||
existing_folder.total_count = total_count
|
# Sync with processed emails service to track which emails are pending
|
||||||
existing_folder.pending_count = 0 # Initially set to 0
|
new_uids_count = self.processed_emails_service.sync_folder_emails(
|
||||||
|
folder_name=folder_name,
|
||||||
|
email_uids=email_uids
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update folder with email counts
|
||||||
|
existing_folder.total_count = len(email_uids)
|
||||||
|
|
||||||
|
# Update pending count based on processed emails service
|
||||||
|
pending_count = self.processed_emails_service.get_pending_count(folder_name)
|
||||||
|
existing_folder.pending_count = pending_count
|
||||||
|
|
||||||
# Get the most recent emails for this folder
|
# Get the most recent emails for this folder
|
||||||
recent_emails = self.get_recent_emails(folder_name, 3)
|
recent_emails = self.get_recent_emails(folder_name, 3)
|
||||||
existing_folder.recent_emails = recent_emails
|
existing_folder.recent_emails = recent_emails
|
||||||
|
|
||||||
|
# Clean up records for emails that no longer exist
|
||||||
|
if new_uids_count > 0:
|
||||||
|
deleted_count = self.processed_emails_service.cleanup_old_records(
|
||||||
|
folder_name=folder_name,
|
||||||
|
current_uids=email_uids
|
||||||
|
)
|
||||||
|
if deleted_count > 0:
|
||||||
|
print(f" - Cleaned up {deleted_count} old email records")
|
||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return True, f"Successfully synced {synced_count} folders"
|
return True, f"Successfully synced {synced_count} folders"
|
||||||
|
|||||||
109
manage.py
109
manage.py
@@ -2,7 +2,9 @@ import sys
|
|||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
from app import create_app, db
|
from app import create_app, db
|
||||||
from app.models import Folder, User
|
from app.models import Folder, User, ProcessedEmail
|
||||||
|
from app.imap_service import IMAPService
|
||||||
|
from app.processed_emails_service import ProcessedEmailsService
|
||||||
from flask.cli import with_appcontext
|
from flask.cli import with_appcontext
|
||||||
import click
|
import click
|
||||||
|
|
||||||
@@ -65,6 +67,111 @@ def mock_process_emails():
|
|||||||
if not matched:
|
if not matched:
|
||||||
print(" -> No matching rule found.")
|
print(" -> No matching rule found.")
|
||||||
|
|
||||||
|
|
||||||
|
@app.cli.command("process-pending-emails")
|
||||||
|
@click.option("--user-id", help="Process emails for a specific user ID only")
|
||||||
|
@click.option("--folder-name", help="Process emails for a specific folder only")
|
||||||
|
@click.option("--batch-size", default=100, help="Number of emails to process in each batch")
|
||||||
|
@click.option("--dry-run", is_flag=True, help="Show what would be processed without actually processing")
|
||||||
|
@with_appcontext
|
||||||
|
def process_pending_emails(user_id, folder_name, batch_size, dry_run):
|
||||||
|
"""Process pending emails for all users or specific user/folder."""
|
||||||
|
from app.processed_emails_service import ProcessedEmailsService
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
# Build query for users
|
||||||
|
user_query = User.query
|
||||||
|
if user_id:
|
||||||
|
user_query = user_query.filter_by(id=int(user_id))
|
||||||
|
|
||||||
|
users = user_query.all()
|
||||||
|
if not users:
|
||||||
|
print("No users found to process")
|
||||||
|
return
|
||||||
|
|
||||||
|
total_processed = 0
|
||||||
|
total_users = len(users)
|
||||||
|
|
||||||
|
print(f"Starting email processing for {total_users} user(s)...")
|
||||||
|
|
||||||
|
for i, user in enumerate(users, 1):
|
||||||
|
print(f"\nProcessing user {i}/{total_users}: {user.email}")
|
||||||
|
|
||||||
|
# Skip users without IMAP configuration
|
||||||
|
if not user.imap_config:
|
||||||
|
print(" - Skipping: No IMAP configuration")
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Initialize services
|
||||||
|
imap_service = IMAPService(user)
|
||||||
|
processed_emails_service = ProcessedEmailsService(user)
|
||||||
|
|
||||||
|
# Get folders for this user
|
||||||
|
folders = Folder.query.filter_by(user_id=user.id).all()
|
||||||
|
|
||||||
|
if not folders:
|
||||||
|
print(" - No folders found for this user")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Process each folder
|
||||||
|
for folder in folders:
|
||||||
|
# Skip if specific folder name is specified and doesn't match
|
||||||
|
if folder_name and folder.name != folder_name:
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f" - Processing folder: {folder.name}")
|
||||||
|
|
||||||
|
# Get all pending emails for this folder
|
||||||
|
pending_emails = ProcessedEmail.query.filter_by(
|
||||||
|
user_id=user.id,
|
||||||
|
folder_name=folder.name,
|
||||||
|
is_processed=False
|
||||||
|
).all()
|
||||||
|
|
||||||
|
if not pending_emails:
|
||||||
|
print(f" - No pending emails in this folder")
|
||||||
|
continue
|
||||||
|
|
||||||
|
total_pending = len(pending_emails)
|
||||||
|
print(f" - Found {total_pending} pending emails")
|
||||||
|
|
||||||
|
# Process in batches
|
||||||
|
processed_in_folder = 0
|
||||||
|
for j in range(0, total_pending, batch_size):
|
||||||
|
batch = pending_emails[j:j + batch_size]
|
||||||
|
batch_uids = [email.email_uid for email in batch]
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
print(f" - Dry run: Would process {len(batch_uids)} emails")
|
||||||
|
processed_in_folder += len(batch_uids)
|
||||||
|
else:
|
||||||
|
# Mark emails as processed
|
||||||
|
result = processed_emails_service.mark_emails_processed(
|
||||||
|
folder_name=folder.name,
|
||||||
|
email_uids=batch_uids
|
||||||
|
)
|
||||||
|
processed_in_folder += result
|
||||||
|
print(f" - Processed {result} emails in this batch")
|
||||||
|
|
||||||
|
# Commit after each batch to avoid large transactions
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
print(f" - Total processed in folder: {processed_in_folder}/{total_pending}")
|
||||||
|
total_processed += processed_in_folder
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" - Error processing user {user.email}: {str(e)}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
db.session.rollback()
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not dry_run:
|
||||||
|
print(f"\nCompleted! Total emails processed: {total_processed}")
|
||||||
|
else:
|
||||||
|
print(f"\nDry run completed! Total emails would be processed: {total_processed}")
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
if len(sys.argv) > 1 and sys.argv[1] == 'mock-process':
|
if len(sys.argv) > 1 and sys.argv[1] == 'mock-process':
|
||||||
mock_process_emails()
|
mock_process_emails()
|
||||||
|
|||||||
Reference in New Issue
Block a user