235 lines
8.7 KiB
Python
235 lines
8.7 KiB
Python
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
from app.models import db, ProcessedEmail, User, Folder
|
|
from sqlalchemy import and_, or_
|
|
|
|
|
|
class ProcessedEmailsService:
|
|
def __init__(self, user: User):
|
|
self.user = user
|
|
|
|
def get_pending_emails(self, folder_name: str) -> List[str]:
|
|
"""Get list of email UIDs that are pending processing in a folder."""
|
|
try:
|
|
pending_emails = ProcessedEmail.query.filter_by(
|
|
user_id=self.user.id,
|
|
folder_name=folder_name,
|
|
is_processed=False
|
|
).all()
|
|
|
|
return [email.email_uid for email in pending_emails]
|
|
except Exception as e:
|
|
print(f"Error getting pending emails: {str(e)}")
|
|
return []
|
|
|
|
def mark_email_processed(self, folder_name: str, email_uid: str) -> bool:
|
|
"""Mark an email as processed."""
|
|
try:
|
|
processed_email = ProcessedEmail.query.filter_by(
|
|
user_id=self.user.id,
|
|
folder_name=folder_name,
|
|
email_uid=email_uid
|
|
).first()
|
|
|
|
if processed_email:
|
|
processed_email.is_processed = True
|
|
processed_email.processed_at = datetime.utcnow()
|
|
processed_email.updated_at = datetime.utcnow()
|
|
db.session.commit()
|
|
return True
|
|
else:
|
|
# Get the folder object to set the folder_id
|
|
folder = Folder.query.filter_by(
|
|
user_id=self.user.id,
|
|
name=folder_name
|
|
).first()
|
|
|
|
if folder:
|
|
# Create a new record if it doesn't exist
|
|
new_record = ProcessedEmail(
|
|
user_id=self.user.id,
|
|
folder_id=folder.id,
|
|
folder_name=folder_name,
|
|
email_uid=email_uid,
|
|
is_processed=True,
|
|
processed_at=datetime.utcnow()
|
|
)
|
|
db.session.add(new_record)
|
|
db.session.commit()
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Error marking email as processed: {str(e)}")
|
|
db.session.rollback()
|
|
return False
|
|
|
|
def mark_emails_processed(self, folder_name: str, email_uids: List[str]) -> int:
|
|
"""Mark multiple emails as processed in bulk."""
|
|
try:
|
|
updated_count = 0
|
|
|
|
# Update existing records
|
|
ProcessedEmail.query.filter_by(
|
|
user_id=self.user.id,
|
|
folder_name=folder_name,
|
|
is_processed=False
|
|
).filter(ProcessedEmail.email_uid.in_(email_uids)).update(
|
|
{
|
|
'is_processed': True,
|
|
'processed_at': datetime.utcnow(),
|
|
'updated_at': datetime.utcnow()
|
|
},
|
|
synchronize_session=False
|
|
)
|
|
|
|
# Check for any email UIDs that don't have records yet
|
|
existing_uids = {email.email_uid for email in ProcessedEmail.query.filter_by(
|
|
user_id=self.user.id,
|
|
folder_name=folder_name
|
|
).all()}
|
|
|
|
missing_uids = set(email_uids) - existing_uids
|
|
if missing_uids:
|
|
# Get the folder object to set the folder_id
|
|
folder = Folder.query.filter_by(
|
|
user_id=self.user.id,
|
|
name=folder_name
|
|
).first()
|
|
|
|
if folder:
|
|
# Create new records for missing UIDs
|
|
new_records = [
|
|
ProcessedEmail(
|
|
user_id=self.user.id,
|
|
folder_id=folder.id,
|
|
folder_name=folder_name,
|
|
email_uid=uid,
|
|
is_processed=True,
|
|
processed_at=datetime.utcnow()
|
|
)
|
|
for uid in missing_uids
|
|
]
|
|
db.session.bulk_save_objects(new_records)
|
|
|
|
updated_count = len(email_uids)
|
|
db.session.commit()
|
|
return updated_count
|
|
|
|
except Exception as e:
|
|
print(f"Error marking emails as processed: {str(e)}")
|
|
db.session.rollback()
|
|
return 0
|
|
|
|
def sync_folder_emails(self, folder_name: str, email_uids: List[str]) -> int:
|
|
"""Sync email UIDs for a folder, adding new ones as pending."""
|
|
try:
|
|
# Get existing UIDs for this folder
|
|
existing_uids = {email.email_uid for email in ProcessedEmail.query.filter_by(
|
|
user_id=self.user.id,
|
|
folder_name=folder_name
|
|
).all()}
|
|
|
|
# Find new UIDs that don't exist yet
|
|
new_uids = set(email_uids) - existing_uids
|
|
|
|
# Create new records for new UIDs
|
|
if new_uids:
|
|
# Get the folder object to set the folder_id
|
|
folder = Folder.query.filter_by(
|
|
user_id=self.user.id,
|
|
name=folder_name
|
|
).first()
|
|
|
|
if folder:
|
|
new_records = [
|
|
ProcessedEmail(
|
|
user_id=self.user.id,
|
|
folder_id=folder.id,
|
|
folder_name=folder_name,
|
|
email_uid=uid,
|
|
is_processed=False
|
|
)
|
|
for uid in new_uids
|
|
]
|
|
db.session.bulk_save_objects(new_records)
|
|
|
|
# Get the folder to update counts
|
|
folder = Folder.query.filter_by(
|
|
user_id=self.user.id,
|
|
name=folder_name
|
|
).first()
|
|
|
|
if folder:
|
|
# Update pending count
|
|
pending_count = ProcessedEmail.query.filter_by(
|
|
user_id=self.user.id,
|
|
folder_name=folder_name,
|
|
is_processed=False
|
|
).count()
|
|
folder.pending_count = pending_count
|
|
folder.total_count = len(email_uids)
|
|
db.session.commit()
|
|
|
|
return len(new_uids)
|
|
|
|
except Exception as e:
|
|
print(f"Error syncing folder emails: {str(e)}")
|
|
db.session.rollback()
|
|
return 0
|
|
|
|
def get_pending_count(self, folder_name: str) -> int:
|
|
"""Get count of pending emails for a folder."""
|
|
try:
|
|
count = ProcessedEmail.query.filter_by(
|
|
user_id=self.user.id,
|
|
folder_name=folder_name,
|
|
is_processed=False
|
|
).count()
|
|
return count
|
|
except Exception as e:
|
|
print(f"Error getting pending count: {str(e)}")
|
|
return 0
|
|
|
|
def cleanup_old_records(self, folder_name: str, current_uids: List[str]) -> int:
|
|
"""Remove records for emails that no longer exist in the folder."""
|
|
try:
|
|
# Find records that no longer exist in the current UIDs
|
|
records_to_delete = ProcessedEmail.query.filter(
|
|
and_(
|
|
ProcessedEmail.user_id == self.user.id,
|
|
ProcessedEmail.folder_name == folder_name,
|
|
~ProcessedEmail.email_uid.in_(current_uids)
|
|
)
|
|
).all()
|
|
|
|
# Delete the records
|
|
for record in records_to_delete:
|
|
db.session.delete(record)
|
|
|
|
deleted_count = len(records_to_delete)
|
|
db.session.commit()
|
|
|
|
# Update folder counts after cleanup
|
|
folder = Folder.query.filter_by(
|
|
user_id=self.user.id,
|
|
name=folder_name
|
|
).first()
|
|
|
|
if folder:
|
|
pending_count = ProcessedEmail.query.filter_by(
|
|
user_id=self.user.id,
|
|
folder_name=folder_name,
|
|
is_processed=False
|
|
).count()
|
|
folder.pending_count = pending_count
|
|
folder.total_count = len(current_uids)
|
|
db.session.commit()
|
|
|
|
return deleted_count
|
|
|
|
except Exception as e:
|
|
print(f"Error cleaning up old records: {str(e)}")
|
|
db.session.rollback()
|
|
return 0 |