Files
email-organizer/app/imap_service.py

233 lines
7.9 KiB
Python

import imaplib
import ssl
import logging
from typing import List, Dict, Optional, Tuple
from email.header import decode_header
from app.models import db, Folder, User
from app import create_app
class IMAPService:
def __init__(self, user: User):
self.user = user
self.config = user.imap_config or {}
self.connection = None
def _connect(self):
"""Create an IMAP connection based on configuration."""
server = self.config.get('server', 'imap.gmail.com')
port = self.config.get('port', 993)
use_ssl = self.config.get('use_ssl', True)
if use_ssl:
# Create SSL context
context = ssl.create_default_context()
# Connect using SSL
self.connection = imaplib.IMAP4_SSL(server, port)
else:
# Connect without SSL
self.connection = imaplib.IMAP4(server, port)
def test_connection(self) -> Tuple[bool, str]:
"""Test IMAP connection with current configuration."""
try:
if not self.config:
return False, "No IMAP configuration found"
# Connect to IMAP server
self._connect()
# Login
self.connection.login(
self.config.get('username', ''),
self.config.get('password', '')
)
# Select inbox to verify connection
resp_code, content = self.connection.select('INBOX')
print(resp_code, content)
# Close the folder, not the connection
self.connection.close()
# Logout
self.connection.logout()
self.connection = None
return True, "Connection successful"
except imaplib.IMAP4.error as e:
print(e)
import traceback
traceback.print_exc()
return False, f"IMAP connection error: {str(e)}"
except Exception as e:
print(e)
import traceback
traceback.print_exc()
return False, f"Connection error: {str(e)}"
finally:
if self.connection:
try:
self.connection.logout()
except:
pass
def get_folders(self) -> List[Dict[str, str]]:
"""Get list of folders from IMAP server."""
try:
if not self.config:
return []
# Connect to IMAP server
self._connect()
# Login
self.connection.login(
self.config.get('username', ''),
self.config.get('password', '')
)
# List folders
status, folder_data = self.connection.list()
if status != 'OK':
return []
print(folder_data)
folders = []
for folder_item in folder_data:
if isinstance(folder_item, bytes):
folder_item = folder_item.decode('utf-8')
# Parse folder name (handle different IMAP server formats)
parts = folder_item.split('"')
if len(parts) >= 3:
folder_name = parts[-1] if parts[-1] else parts[-2]
else:
folder_name = folder_item.split()[-1]
# Handle nested folders (convert to slash notation)
if folder_name.startswith('"') and folder_name.endswith('"'):
folder_name = folder_name[1:-1]
folders.append({
'name': folder_name,
'full_path': folder_name
})
self.connection.logout()
return folders
except Exception as e:
logging.error(f"Error fetching IMAP folders: {str(e)}")
return []
finally:
if self.connection:
try:
self.connection.logout()
except:
pass
def get_folder_email_count(self, folder_name: str) -> int:
"""Get the count of emails in a specific folder."""
try:
# Connect to IMAP server
self._connect()
# Login
self.connection.login(
self.config.get('username', ''),
self.config.get('password', '')
)
# Select the folder
resp_code, content = self.connection.select(folder_name)
if resp_code != 'OK':
return 0
# Get email count
resp_code, content = self.connection.search(None, 'ALL')
if resp_code != 'OK':
return 0
# Count the emails
email_ids = content[0].split()
count = len(email_ids)
# Close folder and logout
self.connection.close()
self.connection.logout()
self.connection = None
return count
except Exception as e:
logging.error(f"Error getting email count for folder {folder_name}: {str(e)}")
if self.connection:
try:
self.connection.logout()
except:
pass
self.connection = None
return 0
def sync_folders(self) -> Tuple[bool, str]:
"""Sync IMAP folders with local database."""
try:
if not self.config:
return False, "No IMAP configuration found"
# Get folders from IMAP server
imap_folders = self.get_folders()
if not imap_folders:
return False, "No folders found on IMAP server"
# Process each folder
synced_count = 0
for imap_folder in imap_folders:
folder_name = imap_folder['name']
# Skip special folders that might not be needed
if folder_name.lower() in ['inbox', 'sent', 'drafts', 'spam', 'trash']:
continue
# Handle nested folder names (convert slashes to underscores or keep as-is)
# According to requirements, nested folders should be created with slashes in the name
display_name = folder_name
# Check if folder already exists
existing_folder = Folder.query.filter_by(
user_id=self.user.id,
name=display_name
).first()
if not existing_folder:
# Create new folder
new_folder = Folder(
user_id=self.user.id,
name=display_name,
rule_text=f"Auto-synced from IMAP folder: {folder_name}",
priority=0 # Default priority
)
db.session.add(new_folder)
synced_count += 1
else:
# Update existing folder with email counts
# Get the total count of emails in this folder
total_count = self.get_folder_email_count(folder_name)
existing_folder.total_count = total_count
existing_folder.pending_count = 0 # Initially set to 0
db.session.commit()
return True, f"Successfully synced {synced_count} folders"
except Exception as e:
import traceback
traceback.print_exc()
print(e)
db.session.rollback()
return False, f"Sync error: {str(e)}"