321 lines
11 KiB
Python
321 lines
11 KiB
Python
import pytest
|
|
from app.processed_emails_service import ProcessedEmailsService
|
|
from app.models import User, Folder, ProcessedEmail, db
|
|
from app import create_app
|
|
from datetime import datetime
|
|
|
|
|
|
class TestProcessedEmailsService:
|
|
def test_init_with_user(self, app, mock_user):
|
|
"""Test ProcessedEmailsService initialization with a user."""
|
|
service = ProcessedEmailsService(mock_user)
|
|
assert service.user == mock_user
|
|
|
|
def test_get_pending_emails_empty(self, app, mock_user):
|
|
"""Test get_pending_emails when no emails are pending."""
|
|
service = ProcessedEmailsService(mock_user)
|
|
|
|
# Mock a folder
|
|
folder = Folder(
|
|
user_id=mock_user.id,
|
|
name='Test Folder',
|
|
rule_text='Test rule'
|
|
)
|
|
db.session.add(folder)
|
|
db.session.commit()
|
|
|
|
# Test with no pending emails
|
|
pending_uids = service.get_pending_emails('Test Folder')
|
|
assert len(pending_uids) == 0
|
|
|
|
def test_get_pending_emails_with_data(self, app, mock_user):
|
|
"""Test get_pending_emails when there are pending emails."""
|
|
with app.app_context():
|
|
service = ProcessedEmailsService(mock_user)
|
|
|
|
# Mock a folder
|
|
folder = Folder(
|
|
user_id=mock_user.id,
|
|
name='Test Folder',
|
|
rule_text='Test rule'
|
|
)
|
|
db.session.add(folder)
|
|
db.session.commit()
|
|
|
|
# Create some pending email records
|
|
pending_emails = [
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='123',
|
|
is_processed=False
|
|
),
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='456',
|
|
is_processed=False
|
|
)
|
|
]
|
|
db.session.bulk_save_objects(pending_emails)
|
|
db.session.commit()
|
|
|
|
# Test getting pending emails
|
|
pending_uids = service.get_pending_emails('Test Folder')
|
|
assert len(pending_uids) == 2
|
|
assert '123' in pending_uids
|
|
assert '456' in pending_uids
|
|
|
|
def test_mark_email_processed_existing(self, app, mock_user):
|
|
"""Test mark_email_processed for an existing pending email."""
|
|
with app.app_context():
|
|
service = ProcessedEmailsService(mock_user)
|
|
|
|
# Mock a folder
|
|
folder = Folder(
|
|
user_id=mock_user.id,
|
|
name='Test Folder',
|
|
rule_text='Test rule'
|
|
)
|
|
db.session.add(folder)
|
|
db.session.commit()
|
|
|
|
# Create a pending email record
|
|
pending_email = ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='123',
|
|
is_processed=False
|
|
)
|
|
db.session.add(pending_email)
|
|
db.session.commit()
|
|
|
|
# Mark email as processed
|
|
success = service.mark_email_processed('Test Folder', '123')
|
|
assert success is True
|
|
|
|
# Verify the email is marked as processed
|
|
updated_email = ProcessedEmail.query.filter_by(
|
|
user_id=mock_user.id,
|
|
folder_name='Test Folder',
|
|
email_uid='123'
|
|
).first()
|
|
assert updated_email.is_processed is True
|
|
assert updated_email.processed_at is not None
|
|
|
|
def test_mark_email_processed_new(self, app, mock_user):
|
|
"""Test mark_email_processed for a new email record."""
|
|
with app.app_context():
|
|
service = ProcessedEmailsService(mock_user)
|
|
|
|
# Mock a folder
|
|
folder = Folder(
|
|
user_id=mock_user.id,
|
|
name='Test Folder',
|
|
rule_text='Test rule'
|
|
)
|
|
db.session.add(folder)
|
|
db.session.commit()
|
|
|
|
# Mark email as processed (create new record)
|
|
success = service.mark_email_processed('Test Folder', '789')
|
|
assert success is True
|
|
|
|
# Verify the record was created
|
|
new_email = ProcessedEmail.query.filter_by(
|
|
user_id=mock_user.id,
|
|
folder_name='Test Folder',
|
|
email_uid='789'
|
|
).first()
|
|
assert new_email is not None
|
|
assert new_email.is_processed is True
|
|
assert new_email.processed_at is not None
|
|
|
|
def test_mark_emails_processed(self, app, mock_user):
|
|
"""Test mark_emails_processed for multiple emails."""
|
|
with app.app_context():
|
|
service = ProcessedEmailsService(mock_user)
|
|
|
|
# Mock a folder
|
|
folder = Folder(
|
|
user_id=mock_user.id,
|
|
name='Test Folder',
|
|
rule_text='Test rule'
|
|
)
|
|
db.session.add(folder)
|
|
db.session.commit()
|
|
|
|
# Create some pending email records
|
|
pending_emails = [
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='123',
|
|
is_processed=False
|
|
),
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='456',
|
|
is_processed=False
|
|
)
|
|
]
|
|
db.session.bulk_save_objects(pending_emails)
|
|
db.session.commit()
|
|
|
|
# Mark multiple emails as processed
|
|
processed_count = service.mark_emails_processed('Test Folder', ['123', '456', '789'])
|
|
assert processed_count == 3
|
|
|
|
# Verify all emails are marked as processed
|
|
processed_uids = set()
|
|
for email in ProcessedEmail.query.filter_by(
|
|
user_id=mock_user.id,
|
|
folder_name='Test Folder'
|
|
).all():
|
|
processed_uids.add(email.email_uid)
|
|
assert email.is_processed is True
|
|
|
|
assert '123' in processed_uids
|
|
assert '456' in processed_uids
|
|
assert '789' in processed_uids
|
|
|
|
def test_sync_folder_emails(self, app, mock_user):
|
|
"""Test sync_folder_emails for a folder."""
|
|
with app.app_context():
|
|
service = ProcessedEmailsService(mock_user)
|
|
|
|
# Mock a folder
|
|
folder = Folder(
|
|
user_id=mock_user.id,
|
|
name='Test Folder',
|
|
rule_text='Test rule'
|
|
)
|
|
db.session.add(folder)
|
|
db.session.commit()
|
|
|
|
# Sync emails
|
|
email_uids = ['123', '456', '789']
|
|
new_emails_count = service.sync_folder_emails('Test Folder', email_uids)
|
|
assert new_emails_count == 3
|
|
|
|
# Verify records were created
|
|
records = ProcessedEmail.query.filter_by(
|
|
user_id=mock_user.id,
|
|
folder_name='Test Folder'
|
|
).all()
|
|
assert len(records) == 3
|
|
|
|
# Verify folder counts
|
|
updated_folder = Folder.query.filter_by(id=folder.id).first()
|
|
assert updated_folder.total_count == 3
|
|
assert updated_folder.pending_count == 3
|
|
|
|
def test_get_pending_count(self, app, mock_user):
|
|
"""Test get_pending_count for a folder."""
|
|
with app.app_context():
|
|
service = ProcessedEmailsService(mock_user)
|
|
|
|
# Mock a folder
|
|
folder = Folder(
|
|
user_id=mock_user.id,
|
|
name='Test Folder',
|
|
rule_text='Test rule'
|
|
)
|
|
db.session.add(folder)
|
|
db.session.commit()
|
|
|
|
# Create some pending and processed emails
|
|
emails = [
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='1',
|
|
is_processed=False
|
|
),
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='2',
|
|
is_processed=False
|
|
),
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='3',
|
|
is_processed=True
|
|
)
|
|
]
|
|
db.session.bulk_save_objects(emails)
|
|
db.session.commit()
|
|
|
|
# Test pending count
|
|
pending_count = service.get_pending_count('Test Folder')
|
|
assert pending_count == 2
|
|
|
|
def test_cleanup_old_records(self, app, mock_user):
|
|
"""Test cleanup_old_records for a folder."""
|
|
with app.app_context():
|
|
service = ProcessedEmailsService(mock_user)
|
|
|
|
# Mock a folder
|
|
folder = Folder(
|
|
user_id=mock_user.id,
|
|
name='Test Folder',
|
|
rule_text='Test rule'
|
|
)
|
|
db.session.add(folder)
|
|
db.session.commit()
|
|
|
|
# Create some email records
|
|
emails = [
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='1',
|
|
is_processed=False
|
|
),
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='2',
|
|
is_processed=False
|
|
),
|
|
ProcessedEmail(
|
|
user_id=mock_user.id,
|
|
folder_id=folder.id,
|
|
folder_name='Test Folder',
|
|
email_uid='3',
|
|
is_processed=True
|
|
)
|
|
]
|
|
db.session.bulk_save_objects(emails)
|
|
db.session.commit()
|
|
|
|
# Clean up records that don't exist in current UIDs
|
|
current_uids = ['1', '2']
|
|
deleted_count = service.cleanup_old_records('Test Folder', current_uids)
|
|
assert deleted_count == 1
|
|
|
|
# Verify only existing records remain
|
|
remaining_records = ProcessedEmail.query.filter_by(
|
|
user_id=mock_user.id,
|
|
folder_name='Test Folder'
|
|
).all()
|
|
assert len(remaining_records) == 2
|
|
assert all(email.email_uid in current_uids for email in remaining_records)
|
|
|
|
# Verify folder counts
|
|
updated_folder = Folder.query.filter_by(id=folder.id).first()
|
|
assert updated_folder.total_count == 2
|
|
assert updated_folder.pending_count == 2 |