diff --git a/.env b/.env index fbf599a..aa114fb 100644 --- a/.env +++ b/.env @@ -6,8 +6,8 @@ DATABASE_URL=postgresql://postgres:password@localhost:5432/email_organizer_dev OPENAI_API_KEY=aaoeu OPENAI_BASE_URL=http://workstation:5082/v1 -OPENAI_MODEL=Qwen3-235B-A22B-Thinking-2507-GGUF +OPENAI_MODEL=Qwen3-235B-A22B-Instruct-2507-GGUF AI_SERVICE_URL=http://workstation:5082/v1 AI_SERVICE_API_KEY=aoue -AI_MODEL=Qwen3-Coder-30B-A3B-Instruct-GGUF-roo \ No newline at end of file +AI_MODEL=Qwen3-Coder-30B-A3B-Instruct-GGUF-roo diff --git a/.qwen/settings.json b/.qwen/settings.json new file mode 100644 index 0000000..ae1e5f9 --- /dev/null +++ b/.qwen/settings.json @@ -0,0 +1,18 @@ +{ + "mcpServers": { + "context7": { + "command": "npx", + "args": [ + "-y", + "@upstash/context7-mcp" + ], + "env": { + "DEFAULT_MINIMUM_TOKENS": "" + }, + "alwaysAllow": [ + "resolve-library-id", + "get-library-docs" + ] + } + } +} \ No newline at end of file diff --git a/.serena/project.yml b/.serena/project.yml new file mode 100644 index 0000000..888d339 --- /dev/null +++ b/.serena/project.yml @@ -0,0 +1,68 @@ +# language of the project (csharp, python, rust, java, typescript, go, cpp, or ruby) +# * For C, use cpp +# * For JavaScript, use typescript +# Special requirements: +# * csharp: Requires the presence of a .sln file in the project folder. +language: python + +# whether to use the project's gitignore file to ignore files +# Added on 2025-04-07 +ignore_all_files_in_gitignore: true +# list of additional paths to ignore +# same syntax as gitignore, so you can use * and ** +# Was previously called `ignored_dirs`, please update your config if you are using that. +# Added (renamed)on 2025-04-07 +ignored_paths: [] + +# whether the project is in read-only mode +# If set to true, all editing tools will be disabled and attempts to use them will result in an error +# Added on 2025-04-18 +read_only: false + + +# list of tool names to exclude. We recommend not excluding any tools, see the readme for more details. +# Below is the complete list of tools for convenience. +# To make sure you have the latest list of tools, and to view their descriptions, +# execute `uv run scripts/print_tool_overview.py`. +# +# * `activate_project`: Activates a project by name. +# * `check_onboarding_performed`: Checks whether project onboarding was already performed. +# * `create_text_file`: Creates/overwrites a file in the project directory. +# * `delete_lines`: Deletes a range of lines within a file. +# * `delete_memory`: Deletes a memory from Serena's project-specific memory store. +# * `execute_shell_command`: Executes a shell command. +# * `find_referencing_code_snippets`: Finds code snippets in which the symbol at the given location is referenced. +# * `find_referencing_symbols`: Finds symbols that reference the symbol at the given location (optionally filtered by type). +# * `find_symbol`: Performs a global (or local) search for symbols with/containing a given name/substring (optionally filtered by type). +# * `get_current_config`: Prints the current configuration of the agent, including the active and available projects, tools, contexts, and modes. +# * `get_symbols_overview`: Gets an overview of the top-level symbols defined in a given file. +# * `initial_instructions`: Gets the initial instructions for the current project. +# Should only be used in settings where the system prompt cannot be set, +# e.g. in clients you have no control over, like Claude Desktop. +# * `insert_after_symbol`: Inserts content after the end of the definition of a given symbol. +# * `insert_at_line`: Inserts content at a given line in a file. +# * `insert_before_symbol`: Inserts content before the beginning of the definition of a given symbol. +# * `list_dir`: Lists files and directories in the given directory (optionally with recursion). +# * `list_memories`: Lists memories in Serena's project-specific memory store. +# * `onboarding`: Performs onboarding (identifying the project structure and essential tasks, e.g. for testing or building). +# * `prepare_for_new_conversation`: Provides instructions for preparing for a new conversation (in order to continue with the necessary context). +# * `read_file`: Reads a file within the project directory. +# * `read_memory`: Reads the memory with the given name from Serena's project-specific memory store. +# * `remove_project`: Removes a project from the Serena configuration. +# * `replace_lines`: Replaces a range of lines within a file with new content. +# * `replace_symbol_body`: Replaces the full definition of a symbol. +# * `restart_language_server`: Restarts the language server, may be necessary when edits not through Serena happen. +# * `search_for_pattern`: Performs a search for a pattern in the project. +# * `summarize_changes`: Provides instructions for summarizing the changes made to the codebase. +# * `switch_modes`: Activates modes by providing a list of their names +# * `think_about_collected_information`: Thinking tool for pondering the completeness of collected information. +# * `think_about_task_adherence`: Thinking tool for determining whether the agent is still on track with the current task. +# * `think_about_whether_you_are_done`: Thinking tool for determining whether the task is truly completed. +# * `write_memory`: Writes a named memory (for future reference) to Serena's project-specific memory store. +excluded_tools: [] + +# initial prompt for the project. It will always be given to the LLM upon activating the project +# (contrary to the memories, which are loaded on demand). +initial_prompt: "" + +project_name: "email-organizer" diff --git a/app/__init__.py b/app/__init__.py index 3853f63..506747b 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -25,6 +25,13 @@ login_manager.login_message = 'Please log in to access this page.' login_manager.login_message_category = 'warning' +# Import scheduler (import here to avoid circular imports) +try: + from app.scheduler import Scheduler +except ImportError: + Scheduler = None + + def create_app(config_name='default'): app = Flask(__name__, static_folder='static', static_url_path='/static') app.config.from_object(config[config_name]) @@ -43,6 +50,11 @@ def create_app(config_name='default'): migrate = Migrate(app, db) login_manager.init_app(app) + # Initialize and register scheduler if available + if Scheduler: + scheduler = Scheduler(app) + app.scheduler = scheduler + # Register blueprints from app.routes import main from app.auth import auth diff --git a/app/commands.py b/app/commands.py index 81eb0b3..b0b1cbb 100644 --- a/app/commands.py +++ b/app/commands.py @@ -32,3 +32,33 @@ def setup_dev(): print("Docker Compose not found. Please install Docker and Docker Compose.") sys.exit(1) +@app.cli.command("start-scheduler") +@click.option('--interval', default=5, help='Interval in minutes between processing runs (default: 5)') +@with_appcontext +def start_scheduler(interval): + """Start the background email processing scheduler.""" + if not hasattr(app, 'scheduler'): + print("Scheduler not available. Make sure app/scheduler.py exists and is properly imported.") + sys.exit(1) + + print(f"Starting email processing scheduler with {interval} minute interval...") + print("Press Ctrl+C to stop") + + try: + # Start the scheduler + app.scheduler.start() + + # Keep the main thread alive + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nShutting down scheduler...") + app.scheduler.stop() + + except Exception as e: + print(f"Error starting scheduler: {e}") + sys.exit(1) + +# Import at the top of the file (add this import if not already present) +import time \ No newline at end of file diff --git a/app/email_processor.py b/app/email_processor.py new file mode 100644 index 0000000..d9561a3 --- /dev/null +++ b/app/email_processor.py @@ -0,0 +1,317 @@ +from typing import List, Dict, Optional +from datetime import datetime, timedelta +import logging +from app.models import db, Folder, User, ProcessedEmail +from app.imap_service import IMAPService +from app.processed_emails_service import ProcessedEmailsService + + +class EmailProcessor: + """ + Service class for processing emails in the background according to user-defined rules. + Handles automated organization of emails based on folder configurations. + """ + + def __init__(self, user: User): + self.user = user + self.imap_service = IMAPService(user) + self.processed_emails_service = ProcessedEmailsService(user) + self.logger = logging.getLogger(__name__) + + def process_user_emails(self) -> Dict[str, any]: + """ + Process emails for a user according to their folder rules. + + Returns: + Dictionary with processing results including success count, error count, and timing + """ + result = { + 'success_count': 0, + 'error_count': 0, + 'processed_folders': [], + 'start_time': datetime.utcnow(), + 'end_time': None, + 'duration': None + } + + try: + # Get all folders with organize_enabled = True, ordered by priority (highest first) + folders = Folder.query.filter_by( + user_id=self.user.id, + organize_enabled=True + ).order_by(Folder.priority.desc()).all() + + if not folders: + self.logger.info(f"No folders to process for user {self.user.email}") + result['end_time'] = datetime.utcnow() + result['duration'] = (result['end_time'] - result['start_time']).total_seconds() + return result + + self.logger.info(f"Processing {len(folders)} folders for user {self.user.email}") + + # Process each folder according to priority + for folder in folders: + try: + folder_result = self.process_folder_emails(folder) + result['success_count'] += folder_result['processed_count'] + result['error_count'] += folder_result['error_count'] + result['processed_folders'].append({ + 'folder_id': folder.id, + 'folder_name': folder.name, + 'processed_count': folder_result['processed_count'], + 'error_count': folder_result['error_count'] + }) + + self.logger.info(f"Processed {folder_result['processed_count']} emails for folder {folder.name}") + + except Exception as e: + self.logger.error(f"Error processing folder {folder.name}: {str(e)}") + result['error_count'] += 1 + continue + + except Exception as e: + self.logger.error(f"Error in process_user_emails for user {self.user.email}: {str(e)}") + result['error_count'] += 1 + + finally: + result['end_time'] = datetime.utcnow() + result['duration'] = (result['end_time'] - result['start_time']).total_seconds() + + return result + + def process_folder_emails(self, folder: Folder) -> Dict[str, any]: + """ + Process emails for a specific folder according to its rules. + + Args: + folder: The folder to process + + Returns: + Dictionary with processing results for this folder + """ + result = { + 'processed_count': 0, + 'error_count': 0, + 'folder_id': folder.id, + 'folder_name': folder.name + } + + try: + # Get pending emails for this folder + pending_email_uids = self.processed_emails_service.get_pending_emails(folder.name) + + if not pending_email_uids: + self.logger.info(f"No pending emails to process for folder {folder.name}") + return result + + self.logger.info(f"Processing {len(pending_email_uids)} pending emails for folder {folder.name}") + + # Process emails in batches to manage system resources + batch_size = 10 # Configurable batch size + processed_batch_count = 0 + + for i in range(0, len(pending_email_uids), batch_size): + batch = pending_email_uids[i:i + batch_size] + batch_result = self._process_email_batch(folder, batch) + + result['processed_count'] += batch_result['processed_count'] + result['error_count'] += batch_result['error_count'] + processed_batch_count += 1 + + self.logger.info(f"Processed batch {i//batch_size + 1} for folder {folder.name}: {batch_result['processed_count']} success, {batch_result['error_count']} errors") + + # Update folder pending count after processing + self._update_folder_counts(folder) + + except Exception as e: + self.logger.error(f"Error in process_folder_emails for folder {folder.name}: {str(e)}") + result['error_count'] += 1 + + return result + + def _process_email_batch(self, folder: Folder, email_uids: List[str]) -> Dict[str, any]: + """ + Process a batch of emails for a folder. + + Args: + folder: The folder whose rules to apply + email_uids: List of email UIDs to process + + Returns: + Dictionary with processing results for this batch + """ + result = { + 'processed_count': 0, + 'error_count': 0 + } + + try: + # Connect to IMAP server + self.imap_service._connect() + + # Login + username = self.user.imap_config.get('username', '') + password = self.user.imap_config.get('password', '') + self.imap_service.connection.login(username, password) + + # Select the source folder + resp_code, content = self.imap_service.connection.select(folder.name) + if resp_code != 'OK': + raise Exception(f"Failed to select folder {folder.name}: {content}") + + # Process each email in the batch + processed_uids = [] + for email_uid in email_uids: + try: + # Get email headers to evaluate rules + headers = self.imap_service.get_email_headers(folder.name, email_uid) + + if not headers: + self.logger.warning(f"Could not get headers for email {email_uid} in folder {folder.name}") + result['error_count'] += 1 + continue + + # Evaluate rules to determine destination + destination_folder = self._evaluate_rules(headers, folder.rule_text) + + if destination_folder and destination_folder != folder.name: + # Move email to destination folder + if self._move_email(email_uid, folder.name, destination_folder): + processed_uids.append(email_uid) + result['processed_count'] += 1 + else: + self.logger.error(f"Failed to move email {email_uid} from {folder.name} to {destination_folder}") + result['error_count'] += 1 + else: + # Mark as processed (no move needed) + processed_uids.append(email_uid) + result['processed_count'] += 1 + + except Exception as e: + self.logger.error(f"Error processing email {email_uid}: {str(e)}") + result['error_count'] += 1 + continue + + # Mark processed emails in the database + if processed_uids: + count = self.processed_emails_service.mark_emails_processed(folder.name, processed_uids) + if count != len(processed_uids): + self.logger.warning(f"Marked {count} emails as processed, but expected {len(processed_uids)}") + + # Close folder and logout + self.imap_service.connection.close() + self.imap_service.connection.logout() + self.imap_service.connection = None + + except Exception as e: + self.logger.error(f"Error in _process_email_batch: {str(e)}") + result['error_count'] += 1 + + # Clean up connection if needed + if self.imap_service.connection: + try: + self.imap_service.connection.close() + self.imap_service.connection.logout() + except: + pass + self.imap_service.connection = None + + return result + + def _evaluate_rules(self, headers: Dict[str, str], rule_text: Optional[str]) -> Optional[str]: + """ + Evaluate rules against email headers to determine destination folder. + + Args: + headers: Email headers (subject, from, to, date) + rule_text: Rule text defining processing criteria + + Returns: + Destination folder name or None if no move is needed + """ + # This is a simplified implementation - in a real app, this would parse + # the rule_text and evaluate it against the headers + # For now, we'll implement a simple rule evaluation + + if not rule_text: + return None + + # Simple example: if rule contains 'move to', extract destination + # This should be replaced with proper rule parsing in production + rule_lower = rule_text.lower() + if 'move to' in rule_lower: + try: + start_idx = rule_lower.find('move to') + 7 # length of 'move to' + # Extract folder name (everything after 'move to' until end or punctuation) + destination = rule_text[start_idx:].strip() + # Remove any trailing punctuation + destination = destination.rstrip('.!,;:') + return destination.strip() + except: + pass + + # Default: no move needed + return None + + def _move_email(self, email_uid: str, source_folder: str, destination_folder: str) -> bool: + """ + Move an email from source folder to destination folder. + + Args: + email_uid: The UID of the email to move + source_folder: The current folder name + destination_folder: The destination folder name + + Returns: + True if successful, False otherwise + """ + try: + # Copy email to destination folder + result = self.imap_service.connection.copy(email_uid, destination_folder) + if result[0] != 'OK': + self.logger.error(f"Copy failed for email {email_uid}: {result[1]}") + return False + + # Store the message ID to ensure it was copied + # (In a real implementation, you might want to verify the copy) + + # Delete from source folder + self.imap_service.connection.select(source_folder) + result = self.imap_service.connection.store(email_uid, '+FLAGS', '\Deleted') + if result[0] != 'OK': + self.logger.error(f"Mark as deleted failed for email {email_uid}: {result[1]}") + return False + + # Expunge to permanently remove + result = self.imap_service.connection.expunge() + if result[0] != 'OK': + self.logger.error(f"Expunge failed for email {email_uid}: {result[1]}") + return False + + return True + + except Exception as e: + self.logger.error(f"Error moving email {email_uid} from {source_folder} to {destination_folder}: {str(e)}") + return False + + def _update_folder_counts(self, folder: Folder) -> None: + """ + Update folder counts after processing. + + Args: + folder: The folder to update + """ + try: + # Update pending count + pending_count = self.processed_emails_service.get_pending_count(folder.name) + folder.pending_count = pending_count + + # Update total count (get from IMAP) + total_count = self.imap_service.get_folder_email_count(folder.name) + folder.total_count = total_count + + db.session.commit() + + except Exception as e: + self.logger.error(f"Error updating folder counts for {folder.name}: {str(e)}") + db.session.rollback() \ No newline at end of file diff --git a/app/routes.py b/app/routes.py index 116a05f..e47973b 100644 --- a/app/routes.py +++ b/app/routes.py @@ -7,6 +7,7 @@ from app.models import Folder from .routes.folders import folders_bp from .routes.imap import imap_bp from .routes.emails import emails_bp +from .routes.background_processing import bp as background_processing_bp # Create the main blueprint main = Blueprint('main', __name__) @@ -15,6 +16,7 @@ main = Blueprint('main', __name__) main.register_blueprint(folders_bp) main.register_blueprint(imap_bp) main.register_blueprint(emails_bp) +main.register_blueprint(background_processing_bp) # Root route that redirects to the main index page @main.route('/') diff --git a/app/routes/background_processing.py b/app/routes/background_processing.py new file mode 100644 index 0000000..283c41a --- /dev/null +++ b/app/routes/background_processing.py @@ -0,0 +1,59 @@ +from flask import Blueprint, jsonify, request +from flask_login import login_required, current_user +from app.email_processor import EmailProcessor + +decorators = [login_required] + +bp = Blueprint('background_processing', __name__, url_prefix='/api/background') + +@bp.route('/process-emails', methods=['POST']) +def trigger_email_processing(): + """ + Trigger immediate email processing for the current user. + """ + try: + processor = EmailProcessor(current_user) + result = processor.process_user_emails() + + return jsonify({ + 'success': True, + 'message': f"Processed {result['success_count']} emails successfully", + 'details': result + }), 200 + + except Exception as e: + return jsonify({ + 'success': False, + 'error': str(e) + }), 500 + +@bp.route('/process-folder/', methods=['POST']) +def trigger_folder_processing(folder_id): + """ + Trigger email processing for a specific folder. + """ + try: + # Verify the folder belongs to the current user + from app.models import Folder + folder = Folder.query.filter_by(id=folder_id, user_id=current_user.id).first() + + if not folder: + return jsonify({ + 'success': False, + 'error': 'Folder not found or access denied' + }), 404 + + processor = EmailProcessor(current_user) + result = processor.process_folder_emails(folder) + + return jsonify({ + 'success': True, + 'message': f"Processed {result['processed_count']} emails for folder {folder.name}", + 'details': result + }), 200 + + except Exception as e: + return jsonify({ + 'success': False, + 'error': str(e) + }), 500 \ No newline at end of file diff --git a/app/scheduler.py b/app/scheduler.py new file mode 100644 index 0000000..00057ed --- /dev/null +++ b/app/scheduler.py @@ -0,0 +1,107 @@ +import logging +import time +from datetime import datetime, timedelta +from threading import Thread +from app.models import User +from app.email_processor import EmailProcessor +from app import create_app + +class Scheduler: + """ + Background scheduler for email processing tasks. + Runs as a separate thread to process emails at regular intervals. + """ + + def __init__(self, app=None, interval_minutes=5): + self.app = app + self.interval = interval_minutes * 60 # Convert to seconds + self.thread = None + self.running = False + self.logger = logging.getLogger(__name__) + + if app: + self.init_app(app) + + def init_app(self, app): + """Initialize the scheduler with a Flask application.""" + self.app = app + + # Store scheduler in app extensions + if not hasattr(app, 'extensions'): + app.extensions = {} + app.extensions['scheduler'] = self + + def start(self): + """Start the background scheduler thread.""" + if self.running: + self.logger.warning("Scheduler is already running") + return + + self.running = True + self.thread = Thread(target=self._run, daemon=True) + self.thread.start() + self.logger.info(f"Scheduler started with {self.interval//60} minute interval") + + def stop(self): + """Stop the background scheduler.""" + self.running = False + if self.thread: + self.logger.info("Scheduler stopped") + self.thread.join() + + def _run(self): + """Main loop for the scheduler.""" + # Wait a moment before first run to allow app to start + time.sleep(5) + + while self.running: + try: + self.logger.info("Starting scheduled email processing") + self.process_all_users() + + # Calculate next run time + next_run = datetime.now() + timedelta(seconds=self.interval) + self.logger.info(f"Next run at {next_run.strftime('%Y-%m-%d %H:%M:%S')}") + + # Sleep for the interval, checking every 10 seconds if we should stop + slept = 0 + while self.running and slept < self.interval: + sleep_time = min(10, self.interval - slept) + time.sleep(sleep_time) + slept += sleep_time + + except Exception as e: + self.logger.error(f"Error in scheduler: {str(e)}") + # Continue running even if there's an error + continue + + def process_all_users(self): + """Process emails for all users.""" + with self.app.app_context(): + try: + # Get all users + users = User.query.all() + + if not users: + self.logger.info("No users found for processing") + return + + self.logger.info(f"Processing emails for {len(users)} users") + + # Process each user's emails + for user in users: + try: + processor = EmailProcessor(user) + result = processor.process_user_emails() + + self.logger.info(f"Completed processing for user {user.email}: " + f"{result['success_count']} success, " + f"{result['error_count']} errors, " + f"{len(result['processed_folders'])} folders processed") + + except Exception as e: + self.logger.error(f"Error processing emails for user {user.email}: {str(e)}") + continue + + except Exception as e: + self.logger.error(f"Error in process_all_users: {str(e)}") \ No newline at end of file diff --git a/docs/user-stories/background-email-processing.md b/docs/user-stories/background-email-processing.md new file mode 100644 index 0000000..0f9b166 --- /dev/null +++ b/docs/user-stories/background-email-processing.md @@ -0,0 +1,132 @@ +# Background Email Processing User Stories & Acceptance Criteria + +## User Stories + +### Primary User Stories + +#### 1. Automated Email Processing +**As a** user with configured email folders +**I want** my emails to be automatically processed in the background +**So that** I don't have to manually trigger email organization + +**Acceptance Criteria:** +- [ ] Background task runs periodically to process emails for all users +- [ ] Only processes emails in folders with `organize_enabled = true` +- [ ] Processes emails according to all folder rules, moving emails as needed to the appropriate folder +- [ ] Respects priority field for folders +- [ ] Marks processed emails to avoid reprocessing +- [ ] Updates folder pending counts after processing +- [ ] Handles processing errors gracefully without stopping other folders +- [ ] Respects folder priority when processing (high priority first) + +#### 2. Email Processing Status Tracking +**As a** user +**I want** to see the status of email processing +**So that** I can understand how many emails are pending organization + +**Acceptance Criteria:** +- [ ] Pending email count is displayed for each folder +- [ ] Count updates in real-time after processing +- [ ] Processed email tracking prevents duplicate processing +- [ ] Email processing history is maintained + +#### 3. Batch Email Processing +**As a** user with many emails +**I want** emails to be processed in batches +**So that** the system doesn't become unresponsive with large mailboxes + +**Acceptance Criteria:** +- [ ] Emails are processed in configurable batch sizes +- [ ] Processing progress is tracked per batch +- [ ] Failed batches don't affect other batches +- [ ] System resources are managed efficiently during processing + +#### 4. Error Handling & Recovery +**As a** user +**I want** email processing errors to be handled gracefully +**So that** temporary issues don't stop all email organization + +**Acceptance Criteria:** +- [ ] Individual email processing failures don't stop folder processing +- [ ] Connection errors to IMAP servers are retried +- [ ] Transient errors are logged but don't halt processing +- [ ] Permanent failures are flagged for user attention +- [ ] Error notifications are sent for critical failures + +#### 5. Processing Controls +**As a** user +**I want** control over when emails are processed +**So that** I can manage processing during preferred times + +**Acceptance Criteria:** +- [ ] Users can enable/disable automatic processing per folder +- [ ] Users can trigger manual processing for specific folders +- [ ] Processing can be paused and resumed +- [ ] Processing schedule can be configured + +### Secondary User Stories + +#### 6. Performance Monitoring +**As a** system administrator +**I want** to monitor email processing performance +**So that** I can identify and resolve bottlenecks + +**Acceptance Criteria:** +- [ ] Processing time metrics are collected +- [ ] Success/failure rates are tracked +- [ ] Resource usage during processing is monitored +- [ ] Performance alerts are sent for degraded performance + +#### 7. Accessibility +**As a** user with disabilities +**I want** email processing status to be accessible +**So that** I can understand processing through assistive technologies + +**Acceptance Criteria:** +- [ ] Processing status updates are announced by screen readers +- [ ] Visual indicators have proper contrast ratios +- [ ] Processing notifications are keyboard navigable +- [ ] Error messages are clear and descriptive + +## Technical Requirements + +### Backend Requirements +- [ ] Background task scheduler implementation +- [ ] IMAP connection management for processing +- [ ] Email processing logic with rule evaluation +- [ ] Processed email tracking system +- [ ] Error handling and logging mechanisms +- [ ] Performance monitoring and metrics collection + +### Frontend Requirements +- [ ] Pending email count display +- [ ] Processing status indicators +- [ ] Manual processing controls +- [ ] Error notification UI +- [ ] Real-time updates for processing status + +### Integration Requirements +- [ ] Integration with IMAP service for email access +- [ ] Integration with folder management system +- [ ] Integration with processed emails tracking +- [ ] API endpoints for manual processing controls + +## Non-Functional Requirements + +### Performance +- Process emails at a rate of at least 10 emails per second +- Background task execution interval configurable (default: 5 minutes) +- Memory usage capped to prevent system resource exhaustion +- Processing of large mailboxes (100k+ emails) completes without timeouts + +### Reliability +- 99.5% uptime for background processing +- Automatic recovery from transient errors +- Graceful degradation when external services are unavailable +- Data consistency maintained during processing failures + +### Scalability +- Support for 1000+ concurrent users +- Horizontal scaling of processing workers +- Efficient database queries for email tracking +- Load balancing of processing tasks across workers diff --git a/tests/test_background_processing_routes.py b/tests/test_background_processing_routes.py new file mode 100644 index 0000000..2541097 --- /dev/null +++ b/tests/test_background_processing_routes.py @@ -0,0 +1,113 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +from flask_login import FlaskLoginClient + +def test_trigger_email_processing_success(client, auth): + """Test successful email processing trigger.""" + # Login as a user + auth.login() + + # Mock the EmailProcessor + with patch('app.routes.background_processing.EmailProcessor') as mock_processor: + mock_processor_instance = mock_processor.return_value + mock_processor_instance.process_user_emails.return_value = { + 'success_count': 5, + 'error_count': 0, + 'processed_folders': [] + } + + # Make the request + response = client.post('/api/background/process-emails') + + # Verify response + assert response.status_code == 200 + json_data = response.get_json() + assert json_data['success'] is True + assert 'Processed 5 emails successfully' in json_data['message'] + +def test_trigger_email_processing_unauthorized(client): + """Test email processing trigger without authentication.""" + # Make the request without logging in + response = client.post('/api/background/process-emails') + + # Verify response (should redirect to login) + assert response.status_code == 302 # Redirect to login + +def test_trigger_folder_processing_success(client, auth, app): + """Test successful folder processing trigger.""" + # Login as a user + auth.login() + + # Create a mock folder for the current user + with app.app_context(): + from app.models import User, Folder + from app import db + + # Get or create test user + user = User.query.filter_by(email='test@example.com').first() + if not user: + user = User( + first_name='Test', + last_name='User', + email='test@example.com', + password_hash='hashed_password' + ) + db.session.add(user) + + # Create test folder + folder = Folder( + user_id=user.id, + name='Test Folder', + rule_text='move to Archive', + priority=1 + ) + db.session.add(folder) + db.session.commit() + folder_id = folder.id + + # Mock the EmailProcessor + with patch('app.routes.background_processing.EmailProcessor') as mock_processor: + mock_processor_instance = mock_processor.return_value + mock_processor_instance.process_folder_emails.return_value = { + 'processed_count': 3, + 'error_count': 0 + } + + # Make the request + response = client.post(f'/api/background/process-folder/{folder_id}') + + # Verify response + assert response.status_code == 200 + json_data = response.get_json() + assert json_data['success'] is True + assert 'Processed 3 emails for folder Test Folder' in json_data['message'] + + # Cleanup + with app.app_context(): + from app.models import db, Folder + folder = Folder.query.get(folder_id) + if folder: + db.session.delete(folder) + db.session.commit() + +def test_trigger_folder_processing_not_found(client, auth): + """Test folder processing trigger with non-existent folder.""" + # Login as a user + auth.login() + + # Make the request with non-existent folder ID + response = client.post('/api/background/process-folder/999') + + # Verify response + assert response.status_code == 404 + json_data = response.get_json() + assert json_data['success'] is False + assert 'Folder not found or access denied' in json_data['error'] + +def test_trigger_folder_processing_unauthorized(client): + """Test folder processing trigger without authentication.""" + # Make the request without logging in + response = client.post('/api/background/process-folder/1') + + # Verify response (should redirect to login) + assert response.status_code == 302 # Redirect to login \ No newline at end of file diff --git a/tests/test_email_processor.py b/tests/test_email_processor.py new file mode 100644 index 0000000..9d4ba2e --- /dev/null +++ b/tests/test_email_processor.py @@ -0,0 +1,239 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +from datetime import datetime, timedelta +from app.email_processor import EmailProcessor +from app.models import User, Folder, ProcessedEmail + +def test_email_processor_initialization(): + """Test that EmailProcessor initializes correctly.""" + # Create a mock user + mock_user = Mock(spec=User) + mock_user.id = 1 + mock_user.email = 'test@example.com' + mock_user.imap_config = {'username': 'user', 'password': 'pass'} + + # Initialize processor + processor = EmailProcessor(mock_user) + + # Verify initialization + assert processor.user == mock_user + assert processor.logger is not None + +@patch('app.email_processor.EmailProcessor._process_email_batch') +def test_process_folder_emails_no_pending(mock_batch): + """Test processing a folder with no pending emails.""" + # Create mocks + mock_user = Mock(spec=User) + mock_user.id = 1 + + mock_folder = Mock(spec=Folder) + mock_folder.id = 1 + mock_folder.name = 'Test Folder' + mock_folder.rule_text = 'move to Archive' + + # Mock the processed emails service + with patch('app.email_processor.ProcessedEmailsService') as mock_service: + mock_service_instance = mock_service.return_value + mock_service_instance.get_pending_emails.return_value = [] + + # Initialize processor + processor = EmailProcessor(mock_user) + result = processor.process_folder_emails(mock_folder) + + # Verify results + assert result['processed_count'] == 0 + assert result['error_count'] == 0 + assert mock_batch.called is False + +@patch('app.email_processor.EmailProcessor._process_email_batch') +def test_process_folder_emails_with_pending(mock_batch): + """Test processing a folder with pending emails.""" + # Create mocks + mock_user = Mock(spec=User) + mock_user.id = 1 + + mock_folder = Mock(spec=Folder) + mock_folder.id = 1 + mock_folder.name = 'Test Folder' + mock_folder.rule_text = 'move to Archive' + + # Mock the processed emails service + with patch('app.email_processor.ProcessedEmailsService') as mock_service: + mock_service_instance = mock_service.return_value + mock_service_instance.get_pending_emails.return_value = ['1', '2', '3'] + + # Setup batch processing mock + mock_batch.return_value = {'processed_count': 3, 'error_count': 0} + + # Initialize processor + processor = EmailProcessor(mock_user) + result = processor.process_folder_emails(mock_folder) + + # Verify results + assert result['processed_count'] == 3 + assert result['error_count'] == 0 + mock_batch.assert_called_once() + +@patch('app.email_processor.EmailProcessor._update_folder_counts') +def test_process_user_emails_no_folders(mock_update): + """Test processing user emails with no folders to process.""" + # Create mock user + mock_user = Mock(spec=User) + mock_user.id = 1 + mock_user.email = 'test@example.com' + + # Mock the database query + with patch('app.email_processor.Folder') as mock_folder: + mock_folder.query.filter_by.return_value.order_by.return_value.all.return_value = [] + + # Initialize processor + processor = EmailProcessor(mock_user) + result = processor.process_user_emails() + + # Verify results + assert result['success_count'] == 0 + assert result['error_count'] == 0 + assert len(result['processed_folders']) == 0 + mock_update.assert_not_called() + +@patch('app.email_processor.EmailProcessor._update_folder_counts') +def test_process_user_emails_with_folders(mock_update): + """Test processing user emails with folders to process.""" + # Create mock user + mock_user = Mock(spec=User) + mock_user.id = 1 + mock_user.email = 'test@example.com' + + # Create mock folder + mock_folder = Mock(spec=Folder) + mock_folder.id = 1 + mock_folder.name = 'Test Folder' + mock_folder.rule_text = 'move to Archive' + mock_folder.priority = 1 + + # Mock the database query + with patch('app.email_processor.Folder') as mock_folder_class: + mock_folder_class.query.filter_by.return_value.order_by.return_value.all.return_value = [mock_folder] + + # Mock the process_folder_emails method + with patch('app.email_processor.EmailProcessor.process_folder_emails') as mock_process: + mock_process.return_value = { + 'processed_count': 5, + 'error_count': 0 + } + + # Initialize processor + processor = EmailProcessor(mock_user) + result = processor.process_user_emails() + + # Verify results + assert result['success_count'] == 5 + assert result['error_count'] == 0 + assert len(result['processed_folders']) == 1 + mock_process.assert_called_once() + +@patch('app.email_processor.EmailProcessor._move_email') +def test_process_email_batch_success(mock_move): + """Test processing an email batch successfully.""" + # Create mocks + mock_user = Mock(spec=User) + mock_user.id = 1 + mock_user.imap_config = {'username': 'user', 'password': 'pass'} + + mock_folder = Mock(spec=Folder) + mock_folder.id = 1 + mock_folder.name = 'Source' + mock_folder.rule_text = 'move to Archive' + + # Mock IMAP service + with patch('app.email_processor.IMAPService') as mock_imap: + mock_imap_instance = mock_imap.return_value + mock_imap_instance._connect.return_value = None + mock_imap_instance.connection.login.return_value = ('OK', []) + mock_imap_instance.connection.select.return_value = ('OK', []) + mock_imap_instance.get_email_headers.return_value = { + 'subject': 'Test Email', + 'from': 'sender@example.com' + } + + # Mock rule evaluation + with patch('app.email_processor.EmailProcessor._evaluate_rules') as mock_evaluate: + mock_evaluate.return_value = 'Archive' + mock_move.return_value = True + + # Mock processed emails service + with patch('app.email_processor.ProcessedEmailsService') as mock_service: + mock_service_instance = mock_service.return_value + mock_service_instance.mark_emails_processed.return_value = 1 + + # Initialize processor + processor = EmailProcessor(mock_user) + result = processor._process_email_batch(mock_folder, ['1']) + + # Verify results + assert result['processed_count'] == 1 + assert result['error_count'] == 0 + mock_move.assert_called_once() + +def test_evaluate_rules_no_rule_text(): + """Test rule evaluation with no rule text.""" + # Create mocks + mock_user = Mock(spec=User) + mock_folder = Mock(spec=Folder) + + # Initialize processor + processor = EmailProcessor(mock_user) + + # Test with None rule text + result = processor._evaluate_rules({'subject': 'Test'}, None) + assert result is None + + # Test with empty rule text + result = processor._evaluate_rules({'subject': 'Test'}, '') + assert result is None + +def test_evaluate_rules_with_move_to(): + """Test rule evaluation with 'move to' directive.""" + # Create mocks + mock_user = Mock(spec=User) + mock_folder = Mock(spec=Folder) + + # Initialize processor + processor = EmailProcessor(mock_user) + + # Test with simple move to + result = processor._evaluate_rules({'subject': 'Test'}, 'move to Archive') + assert result == 'Archive' + + # Test with punctuation + result = processor._evaluate_rules({'subject': 'Test'}, 'move to Archive.') + assert result == 'Archive' + + # Test with extra spaces + result = processor._evaluate_rules({'subject': 'Test'}, 'move to Archive ') + assert result == 'Archive' + +@patch('app.email_processor.ProcessedEmailsService') +def test_update_folder_counts(mock_service): + """Test updating folder counts after processing.""" + # Create mocks + mock_user = Mock(spec=User) + mock_folder = Mock(spec=Folder) + mock_folder.name = 'Test Folder' + mock_folder.pending_count = 5 + mock_folder.total_count = 10 + + # Mock service methods + mock_service_instance = mock_service.return_value + mock_service_instance.get_pending_count.return_value = 3 + + with patch('app.email_processor.EmailProcessor._get_imap_connection') as mock_imap: + mock_imap.return_value.get_folder_email_count.return_value = 12 + + # Initialize processor + processor = EmailProcessor(mock_user) + processor._update_folder_counts(mock_folder) + + # Verify counts were updated + assert mock_folder.pending_count == 3 + assert mock_folder.total_count == 12 \ No newline at end of file diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..c7a6e48 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,127 @@ +import pytest +from unittest.mock import Mock, patch, MagicMock +from datetime import datetime, timedelta +from threading import Thread +from app.scheduler import Scheduler + +def test_scheduler_initialization(): + """Test that Scheduler initializes correctly.""" + # Create a mock app + mock_app = Mock() + + # Initialize scheduler + scheduler = Scheduler(mock_app, interval_minutes=10) + + # Verify initialization + assert scheduler.app == mock_app + assert scheduler.interval == 600 # 10 minutes in seconds + assert scheduler.thread is None + assert scheduler.running is False + +def test_scheduler_init_app(): + """Test that init_app method works correctly.""" + # Create a mock app + mock_app = Mock() + mock_app.extensions = {} + + # Initialize scheduler + scheduler = Scheduler() + scheduler.init_app(mock_app) + + # Verify scheduler is stored in app extensions + assert 'scheduler' in mock_app.extensions + assert mock_app.extensions['scheduler'] == scheduler + +def test_scheduler_start_stop(): + """Test that scheduler can be started and stopped.""" + # Create a mock app + mock_app = Mock() + mock_app.app_context.return_value.__enter__.return_value = None + mock_app.app_context.return_value.__exit__.return_value = None + + # Initialize scheduler + scheduler = Scheduler(mock_app) + + # Start the scheduler + with patch('app.scheduler.Scheduler._run') as mock_run: + mock_run.side_effect = lambda: setattr(scheduler, 'running', False) # Stop after one iteration + + scheduler.start() + + # Give it a moment to start + import time + time.sleep(0.1) + + # Verify thread was created and started + assert scheduler.thread is not None + assert scheduler.running is True + + # Wait for the run method to complete + if scheduler.thread: + scheduler.thread.join(timeout=1) + + # Stop should be called automatically when running becomes False + assert scheduler.running is False + +def test_scheduler_process_all_users_no_users(): + """Test process_all_users with no users in database.""" + # Create a mock app + mock_app = Mock() + mock_app.app_context.return_value.__enter__.return_value = None + mock_app.app_context.return_value.__exit__.return_value = None + + # Initialize scheduler + scheduler = Scheduler(mock_app) + + # Mock the User query + with patch('app.scheduler.User') as mock_user: + mock_user.query.all.return_value = [] + + # Call process_all_users + with patch('app.scheduler.Scheduler.logger') as mock_logger: + scheduler.process_all_users() + + # Verify logger was called + mock_logger.info.assert_any_call("No users found for processing") + +def test_scheduler_process_all_users_with_users(): + """Test process_all_users with users in database.""" + # Create a mock app + mock_app = Mock() + mock_app.app_context.return_value.__enter__.return_value = None + mock_app.app_context.return_value.__exit__.return_value = None + + # Initialize scheduler + scheduler = Scheduler(mock_app) + + # Create mock users + mock_user1 = Mock() + mock_user1.id = 1 + mock_user1.email = 'user1@example.com' + + # Mock the User query + with patch('app.scheduler.User') as mock_user_class: + mock_user_class.query.all.return_value = [mock_user1] + + # Mock the EmailProcessor + with patch('app.scheduler.EmailProcessor') as mock_processor: + mock_processor_instance = mock_processor.return_value + mock_processor_instance.process_user_emails.return_value = { + 'success_count': 5, + 'error_count': 0, + 'processed_folders': [] + } + + # Call process_all_users + with patch('app.scheduler.Scheduler.logger') as mock_logger: + scheduler.process_all_users() + + # Verify processor was called + mock_processor.assert_called_once_with(mock_user1) + mock_processor_instance.process_user_emails.assert_called_once() + + # Verify logging + mock_logger.info.assert_any_call("Processing emails for 1 users") + mock_logger.info.assert_any_call( + f"Completed processing for user {mock_user1.email}: 5 success, 0 errors, 0 folders processed" + ) \ No newline at end of file