Files
email-organizer/app/processed_emails_service.py
2025-08-06 15:38:49 -07:00

235 lines
8.7 KiB
Python

from typing import List, Dict, Optional
from datetime import datetime
from app.models import db, ProcessedEmail, User, Folder
from sqlalchemy import and_, or_
class ProcessedEmailsService:
def __init__(self, user: User):
self.user = user
def get_pending_emails(self, folder_name: str) -> List[str]:
"""Get list of email UIDs that are pending processing in a folder."""
try:
pending_emails = ProcessedEmail.query.filter_by(
user_id=self.user.id,
folder_name=folder_name,
is_processed=False
).all()
return [email.email_uid for email in pending_emails]
except Exception as e:
print(f"Error getting pending emails: {str(e)}")
return []
def mark_email_processed(self, folder_name: str, email_uid: str) -> bool:
"""Mark an email as processed."""
try:
processed_email = ProcessedEmail.query.filter_by(
user_id=self.user.id,
folder_name=folder_name,
email_uid=email_uid
).first()
if processed_email:
processed_email.is_processed = True
processed_email.processed_at = datetime.utcnow()
processed_email.updated_at = datetime.utcnow()
db.session.commit()
return True
else:
# Get the folder object to set the folder_id
folder = Folder.query.filter_by(
user_id=self.user.id,
name=folder_name
).first()
if folder:
# Create a new record if it doesn't exist
new_record = ProcessedEmail(
user_id=self.user.id,
folder_id=folder.id,
folder_name=folder_name,
email_uid=email_uid,
is_processed=True,
processed_at=datetime.utcnow()
)
db.session.add(new_record)
db.session.commit()
return True
else:
return False
except Exception as e:
print(f"Error marking email as processed: {str(e)}")
db.session.rollback()
return False
def mark_emails_processed(self, folder_name: str, email_uids: List[str]) -> int:
"""Mark multiple emails as processed in bulk."""
try:
updated_count = 0
# Update existing records
ProcessedEmail.query.filter_by(
user_id=self.user.id,
folder_name=folder_name,
is_processed=False
).filter(ProcessedEmail.email_uid.in_(email_uids)).update(
{
'is_processed': True,
'processed_at': datetime.utcnow(),
'updated_at': datetime.utcnow()
},
synchronize_session=False
)
# Check for any email UIDs that don't have records yet
existing_uids = {email.email_uid for email in ProcessedEmail.query.filter_by(
user_id=self.user.id,
folder_name=folder_name
).all()}
missing_uids = set(email_uids) - existing_uids
if missing_uids:
# Get the folder object to set the folder_id
folder = Folder.query.filter_by(
user_id=self.user.id,
name=folder_name
).first()
if folder:
# Create new records for missing UIDs
new_records = [
ProcessedEmail(
user_id=self.user.id,
folder_id=folder.id,
folder_name=folder_name,
email_uid=uid,
is_processed=True,
processed_at=datetime.utcnow()
)
for uid in missing_uids
]
db.session.bulk_save_objects(new_records)
updated_count = len(email_uids)
db.session.commit()
return updated_count
except Exception as e:
print(f"Error marking emails as processed: {str(e)}")
db.session.rollback()
return 0
def sync_folder_emails(self, folder_name: str, email_uids: List[str]) -> int:
"""Sync email UIDs for a folder, adding new ones as pending."""
try:
# Get existing UIDs for this folder
existing_uids = {email.email_uid for email in ProcessedEmail.query.filter_by(
user_id=self.user.id,
folder_name=folder_name
).all()}
# Find new UIDs that don't exist yet
new_uids = set(email_uids) - existing_uids
# Create new records for new UIDs
if new_uids:
# Get the folder object to set the folder_id
folder = Folder.query.filter_by(
user_id=self.user.id,
name=folder_name
).first()
if folder:
new_records = [
ProcessedEmail(
user_id=self.user.id,
folder_id=folder.id,
folder_name=folder_name,
email_uid=uid,
is_processed=False
)
for uid in new_uids
]
db.session.bulk_save_objects(new_records)
# Get the folder to update counts
folder = Folder.query.filter_by(
user_id=self.user.id,
name=folder_name
).first()
if folder:
# Update pending count
pending_count = ProcessedEmail.query.filter_by(
user_id=self.user.id,
folder_name=folder_name,
is_processed=False
).count()
folder.pending_count = pending_count
folder.total_count = len(email_uids)
db.session.commit()
return len(new_uids)
except Exception as e:
print(f"Error syncing folder emails: {str(e)}")
db.session.rollback()
return 0
def get_pending_count(self, folder_name: str) -> int:
"""Get count of pending emails for a folder."""
try:
count = ProcessedEmail.query.filter_by(
user_id=self.user.id,
folder_name=folder_name,
is_processed=False
).count()
return count
except Exception as e:
print(f"Error getting pending count: {str(e)}")
return 0
def cleanup_old_records(self, folder_name: str, current_uids: List[str]) -> int:
"""Remove records for emails that no longer exist in the folder."""
try:
# Find records that no longer exist in the current UIDs
records_to_delete = ProcessedEmail.query.filter(
and_(
ProcessedEmail.user_id == self.user.id,
ProcessedEmail.folder_name == folder_name,
~ProcessedEmail.email_uid.in_(current_uids)
)
).all()
# Delete the records
for record in records_to_delete:
db.session.delete(record)
deleted_count = len(records_to_delete)
db.session.commit()
# Update folder counts after cleanup
folder = Folder.query.filter_by(
user_id=self.user.id,
name=folder_name
).first()
if folder:
pending_count = ProcessedEmail.query.filter_by(
user_id=self.user.id,
folder_name=folder_name,
is_processed=False
).count()
folder.pending_count = pending_count
folder.total_count = len(current_uids)
db.session.commit()
return deleted_count
except Exception as e:
print(f"Error cleaning up old records: {str(e)}")
db.session.rollback()
return 0