import imaplib import ssl import logging from typing import List, Dict, Optional, Tuple from email.header import decode_header from email.utils import parsedate_to_datetime from app.models import db, Folder, User from app import create_app class IMAPService: def __init__(self, user: User): self.user = user self.config = user.imap_config or {} self.connection = None def _connect(self): """Create an IMAP connection based on configuration.""" server = self.config.get('server', 'imap.gmail.com') port = self.config.get('port', 993) use_ssl = self.config.get('use_ssl', True) if use_ssl: # Create SSL context context = ssl.create_default_context() # Connect using SSL self.connection = imaplib.IMAP4_SSL(server, port) else: # Connect without SSL self.connection = imaplib.IMAP4(server, port) def test_connection(self) -> Tuple[bool, str]: """Test IMAP connection with current configuration.""" try: if not self.config: return False, "No IMAP configuration found" # Connect to IMAP server self._connect() # Login self.connection.login( self.config.get('username', ''), self.config.get('password', '') ) # Select inbox to verify connection resp_code, content = self.connection.select('INBOX') print(resp_code, content) # Close the folder, not the connection self.connection.close() # Logout self.connection.logout() self.connection = None return True, "Connection successful" except imaplib.IMAP4.error as e: print(e) import traceback traceback.print_exc() return False, f"IMAP connection error: {str(e)}" except Exception as e: print(e) import traceback traceback.print_exc() return False, f"Connection error: {str(e)}" finally: if self.connection: try: self.connection.logout() except: pass def get_folders(self) -> List[Dict[str, str]]: """Get list of folders from IMAP server.""" try: if not self.config: return [] # Connect to IMAP server self._connect() # Login self.connection.login( self.config.get('username', ''), self.config.get('password', '') ) # List folders status, folder_data = self.connection.list() if status != 'OK': return [] print(folder_data) folders = [] for folder_item in folder_data: if isinstance(folder_item, bytes): folder_item = folder_item.decode('utf-8') # Parse folder name (handle different IMAP server formats) parts = folder_item.split('"') if len(parts) >= 3: folder_name = parts[-1] if parts[-1] else parts[-2] else: folder_name = folder_item.split()[-1] # Handle nested folders (convert to slash notation) if folder_name.startswith('"') and folder_name.endswith('"'): folder_name = folder_name[1:-1] folders.append({ 'name': folder_name, 'full_path': folder_name }) self.connection.logout() return folders except Exception as e: logging.error(f"Error fetching IMAP folders: {str(e)}") return [] finally: if self.connection: try: self.connection.logout() except: pass def get_folder_email_count(self, folder_name: str) -> int: """Get the count of emails in a specific folder.""" try: # Connect to IMAP server self._connect() # Login self.connection.login( self.config.get('username', ''), self.config.get('password', '') ) # Select the folder resp_code, content = self.connection.select(folder_name) if resp_code != 'OK': return 0 # Get email count resp_code, content = self.connection.search(None, 'ALL') if resp_code != 'OK': return 0 # Count the emails email_ids = content[0].split() count = len(email_ids) # Close folder and logout self.connection.close() self.connection.logout() self.connection = None return count except Exception as e: logging.error(f"Error getting email count for folder {folder_name}: {str(e)}") if self.connection: try: self.connection.logout() except: pass self.connection = None return 0 def get_recent_emails(self, folder_name: str, limit: int = 3) -> List[Dict[str, any]]: """Get the most recent email subjects and dates from a specific folder.""" try: # Connect to IMAP server self._connect() # Login self.connection.login( self.config.get('username', ''), self.config.get('password', '') ) # Select the folder resp_code, content = self.connection.select(folder_name) if resp_code != 'OK': return [] # Get email IDs (most recent first) resp_code, content = self.connection.search(None, 'ALL') if resp_code != 'OK': return [] # Get the most recent emails (limit to 3) email_ids = content[0].split() recent_email_ids = email_ids[:limit] if len(email_ids) >= limit else email_ids recent_emails = [] for email_id in reversed(recent_email_ids): # Process from newest to oldest # Fetch the email headers resp_code, content = self.connection.fetch(email_id, '(RFC822.HEADER)') if resp_code != 'OK': continue # Parse the email headers raw_email = content[0][1] import email msg = email.message_from_bytes(raw_email) # Extract subject and date subject = msg.get('Subject', 'No Subject') date_str = msg.get('Date', '') # Decode the subject if needed try: decoded_parts = decode_header(subject) subject = ''.join([str(part, encoding or 'utf-8') if isinstance(part, bytes) else part for part, encoding in decoded_parts]) except Exception: pass # If decoding fails, use the original subject # Parse date if available try: email_date = parsedate_to_datetime(date_str) if date_str else None except Exception: email_date = None recent_emails.append({ 'subject': subject, 'date': email_date.isoformat() if email_date else None }) # Close folder and logout self.connection.close() self.connection.logout() self.connection = None return recent_emails except Exception as e: logging.error(f"Error getting recent emails for folder {folder_name}: {str(e)}") if self.connection: try: self.connection.logout() except: pass self.connection = None return [] def sync_folders(self) -> Tuple[bool, str]: """Sync IMAP folders with local database.""" try: if not self.config: return False, "No IMAP configuration found" # Get folders from IMAP server imap_folders = self.get_folders() if not imap_folders: return False, "No folders found on IMAP server" # Process each folder synced_count = 0 for imap_folder in imap_folders: folder_name = imap_folder['name'] # Skip special folders that might not be needed if folder_name.lower() in ['inbox', 'sent', 'drafts', 'spam', 'trash']: continue # Handle nested folder names (convert slashes to underscores or keep as-is) # According to requirements, nested folders should be created with slashes in the name display_name = folder_name # Check if folder already exists existing_folder = Folder.query.filter_by( user_id=self.user.id, name=display_name ).first() if not existing_folder: # Create new folder new_folder = Folder( user_id=self.user.id, name=display_name, rule_text=f"Auto-synced from IMAP folder: {folder_name}", priority=0 # Default priority ) db.session.add(new_folder) synced_count += 1 else: # Update existing folder with email counts and recent emails # Get the total count of emails in this folder total_count = self.get_folder_email_count(folder_name) existing_folder.total_count = total_count existing_folder.pending_count = 0 # Initially set to 0 # Get the most recent emails for this folder recent_emails = self.get_recent_emails(folder_name, 3) existing_folder.recent_emails = recent_emails db.session.commit() return True, f"Successfully synced {synced_count} folders" except Exception as e: import traceback traceback.print_exc() print(e) db.session.rollback() return False, f"Sync error: {str(e)}"