423 lines
15 KiB
Python
423 lines
15 KiB
Python
import imaplib
|
|
import ssl
|
|
import logging
|
|
from typing import List, Dict, Optional, Tuple
|
|
from email.header import decode_header
|
|
from email.utils import parsedate_to_datetime
|
|
from app.models import db, Folder, User
|
|
from app import create_app
|
|
|
|
class IMAPService:
|
|
def __init__(self, user: User):
|
|
self.user = user
|
|
self.config = user.imap_config or {}
|
|
self.connection = None
|
|
|
|
def _connect(self):
|
|
"""Create an IMAP connection based on configuration."""
|
|
server = self.config.get('server', 'imap.gmail.com')
|
|
port = self.config.get('port', 993)
|
|
use_ssl = self.config.get('use_ssl', True)
|
|
|
|
if use_ssl:
|
|
# Create SSL context
|
|
context = ssl.create_default_context()
|
|
# Connect using SSL
|
|
self.connection = imaplib.IMAP4_SSL(server, port)
|
|
else:
|
|
# Connect without SSL
|
|
self.connection = imaplib.IMAP4(server, port)
|
|
|
|
def test_connection(self) -> Tuple[bool, str]:
|
|
"""Test IMAP connection with current configuration."""
|
|
try:
|
|
if not self.config:
|
|
return False, "No IMAP configuration found"
|
|
|
|
# Connect to IMAP server
|
|
self._connect()
|
|
|
|
# Login
|
|
self.connection.login(
|
|
self.config.get('username', ''),
|
|
self.config.get('password', '')
|
|
)
|
|
|
|
# Select inbox to verify connection
|
|
resp_code, content = self.connection.select('INBOX')
|
|
print(resp_code, content)
|
|
|
|
# Close the folder, not the connection
|
|
self.connection.close()
|
|
|
|
# Logout
|
|
self.connection.logout()
|
|
self.connection = None
|
|
|
|
return True, "Connection successful"
|
|
|
|
except imaplib.IMAP4.error as e:
|
|
print(e)
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False, f"IMAP connection error: {str(e)}"
|
|
except Exception as e:
|
|
print(e)
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False, f"Connection error: {str(e)}"
|
|
finally:
|
|
if self.connection:
|
|
try:
|
|
self.connection.logout()
|
|
except:
|
|
pass
|
|
|
|
def get_folders(self) -> List[Dict[str, str]]:
|
|
"""Get list of folders from IMAP server."""
|
|
try:
|
|
if not self.config:
|
|
return []
|
|
|
|
# Connect to IMAP server
|
|
self._connect()
|
|
|
|
# Login
|
|
self.connection.login(
|
|
self.config.get('username', ''),
|
|
self.config.get('password', '')
|
|
)
|
|
|
|
# List folders
|
|
status, folder_data = self.connection.list()
|
|
|
|
if status != 'OK':
|
|
return []
|
|
|
|
print(folder_data)
|
|
|
|
folders = []
|
|
for folder_item in folder_data:
|
|
if isinstance(folder_item, bytes):
|
|
folder_item = folder_item.decode('utf-8')
|
|
|
|
# Parse folder name (handle different IMAP server formats)
|
|
parts = folder_item.split('"')
|
|
if len(parts) >= 3:
|
|
folder_name = parts[-1] if parts[-1] else parts[-2]
|
|
else:
|
|
folder_name = folder_item.split()[-1]
|
|
|
|
# Handle nested folders (convert to slash notation)
|
|
if folder_name.startswith('"') and folder_name.endswith('"'):
|
|
folder_name = folder_name[1:-1]
|
|
|
|
folders.append({
|
|
'name': folder_name,
|
|
'full_path': folder_name
|
|
})
|
|
|
|
self.connection.logout()
|
|
|
|
return folders
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error fetching IMAP folders: {str(e)}")
|
|
return []
|
|
finally:
|
|
if self.connection:
|
|
try:
|
|
self.connection.logout()
|
|
except:
|
|
pass
|
|
|
|
def get_folder_email_count(self, folder_name: str) -> int:
|
|
"""Get the count of emails in a specific folder."""
|
|
try:
|
|
# Connect to IMAP server
|
|
self._connect()
|
|
|
|
# Login
|
|
self.connection.login(
|
|
self.config.get('username', ''),
|
|
self.config.get('password', '')
|
|
)
|
|
|
|
# Select the folder
|
|
resp_code, content = self.connection.select(folder_name)
|
|
if resp_code != 'OK':
|
|
return 0
|
|
|
|
# Get email count
|
|
resp_code, content = self.connection.search(None, 'ALL')
|
|
if resp_code != 'OK':
|
|
return 0
|
|
|
|
# Count the emails
|
|
email_ids = content[0].split()
|
|
count = len(email_ids)
|
|
|
|
# Close folder and logout
|
|
self.connection.close()
|
|
self.connection.logout()
|
|
self.connection = None
|
|
|
|
return count
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting email count for folder {folder_name}: {str(e)}")
|
|
if self.connection:
|
|
try:
|
|
self.connection.logout()
|
|
except:
|
|
pass
|
|
self.connection = None
|
|
return 0
|
|
|
|
def get_recent_emails(self, folder_name: str, limit: int = 3) -> List[Dict[str, any]]:
|
|
"""Get the most recent email subjects and dates from a specific folder."""
|
|
try:
|
|
# Connect to IMAP server
|
|
self._connect()
|
|
|
|
# Login
|
|
self.connection.login(
|
|
self.config.get('username', ''),
|
|
self.config.get('password', '')
|
|
)
|
|
|
|
# Select the folder
|
|
resp_code, content = self.connection.select(folder_name)
|
|
if resp_code != 'OK':
|
|
return []
|
|
|
|
# Get email IDs (most recent first)
|
|
resp_code, content = self.connection.search(None, 'ALL')
|
|
if resp_code != 'OK':
|
|
return []
|
|
|
|
# Get the most recent emails (limit to 3)
|
|
email_ids = content[0].split()
|
|
recent_email_ids = email_ids[:limit] if len(email_ids) >= limit else email_ids
|
|
|
|
recent_emails = []
|
|
for email_id in reversed(recent_email_ids): # Process from newest to oldest
|
|
# Fetch the email headers
|
|
resp_code, content = self.connection.fetch(email_id, '(RFC822.HEADER)')
|
|
if resp_code != 'OK':
|
|
continue
|
|
|
|
# Parse the email headers
|
|
raw_email = content[0][1]
|
|
import email
|
|
msg = email.message_from_bytes(raw_email)
|
|
|
|
# Extract subject and date
|
|
subject = msg.get('Subject', 'No Subject')
|
|
date_str = msg.get('Date', '')
|
|
|
|
# Decode the subject if needed
|
|
try:
|
|
decoded_parts = decode_header(subject)
|
|
subject = ''.join([str(part, encoding or 'utf-8') if isinstance(part, bytes) else part for part, encoding in decoded_parts])
|
|
except Exception:
|
|
pass # If decoding fails, use the original subject
|
|
|
|
# Parse date if available
|
|
try:
|
|
email_date = parsedate_to_datetime(date_str) if date_str else None
|
|
except Exception:
|
|
email_date = None
|
|
|
|
recent_emails.append({
|
|
'subject': subject,
|
|
'date': email_date.isoformat() if email_date else None
|
|
})
|
|
|
|
# Close folder and logout
|
|
self.connection.close()
|
|
self.connection.logout()
|
|
self.connection = None
|
|
|
|
return recent_emails
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting recent emails for folder {folder_name}: {str(e)}")
|
|
if self.connection:
|
|
try:
|
|
self.connection.logout()
|
|
except:
|
|
pass
|
|
self.connection = None
|
|
return []
|
|
|
|
def get_folder_email_uids(self, folder_name: str) -> List[str]:
|
|
"""Get the list of email UIDs in a specific folder."""
|
|
try:
|
|
# Connect to IMAP server
|
|
self._connect()
|
|
|
|
# Login
|
|
self.connection.login(
|
|
self.config.get('username', ''),
|
|
self.config.get('password', '')
|
|
)
|
|
|
|
# Select the folder
|
|
resp_code, content = self.connection.select(folder_name)
|
|
if resp_code != 'OK':
|
|
return []
|
|
|
|
# Get email UIDs
|
|
resp_code, content = self.connection.search(None, 'ALL')
|
|
if resp_code != 'OK':
|
|
return []
|
|
|
|
# Extract UIDs
|
|
email_uids = content[0].split()
|
|
uid_list = [uid.decode('utf-8') for uid in email_uids]
|
|
|
|
# Close folder and logout
|
|
self.connection.close()
|
|
self.connection.logout()
|
|
self.connection = None
|
|
|
|
return uid_list
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting email UIDs for folder {folder_name}: {str(e)}")
|
|
if self.connection:
|
|
try:
|
|
self.connection.logout()
|
|
except:
|
|
pass
|
|
self.connection = None
|
|
return []
|
|
|
|
def get_email_headers(self, folder_name: str, email_uid: str) -> Dict[str, str]:
|
|
"""Get email headers for a specific email UID."""
|
|
try:
|
|
# Connect to IMAP server
|
|
self._connect()
|
|
|
|
# Login
|
|
self.connection.login(
|
|
self.config.get('username', ''),
|
|
self.config.get('password', '')
|
|
)
|
|
|
|
# Select the folder
|
|
resp_code, content = self.connection.select(folder_name)
|
|
if resp_code != 'OK':
|
|
return {}
|
|
|
|
# Fetch email headers
|
|
resp_code, content = self.connection.fetch(email_uid, '(RFC822.HEADER)')
|
|
if resp_code != 'OK':
|
|
return {}
|
|
|
|
# Parse the email headers
|
|
raw_email = content[0][1]
|
|
import email
|
|
msg = email.message_from_bytes(raw_email)
|
|
|
|
# Extract headers
|
|
headers = {
|
|
'subject': msg.get('Subject', 'No Subject'),
|
|
'date': msg.get('Date', ''),
|
|
'from': msg.get('From', ''),
|
|
'to': msg.get('To', ''),
|
|
'message_id': msg.get('Message-ID', '')
|
|
}
|
|
|
|
# Close folder and logout
|
|
self.connection.close()
|
|
self.connection.logout()
|
|
self.connection = None
|
|
|
|
return headers
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting email headers for UID {email_uid} in folder {folder_name}: {str(e)}")
|
|
if self.connection:
|
|
try:
|
|
self.connection.logout()
|
|
except:
|
|
pass
|
|
self.connection = None
|
|
return {}
|
|
|
|
def sync_folders(self) -> Tuple[bool, str]:
|
|
"""Sync IMAP folders with local database."""
|
|
try:
|
|
if not self.config:
|
|
return False, "No IMAP configuration found"
|
|
|
|
# Get folders from IMAP server
|
|
imap_folders = self.get_folders()
|
|
|
|
if not imap_folders:
|
|
return False, "No folders found on IMAP server"
|
|
|
|
# Deduplicate folders by name to prevent creating multiple entries for the same folder
|
|
unique_folders = []
|
|
seen_names = set()
|
|
for imap_folder in imap_folders:
|
|
folder_name = imap_folder['name']
|
|
|
|
# Skip special folders that might not be needed
|
|
if folder_name.lower() in ['inbox', 'sent', 'drafts', 'spam', 'trash']:
|
|
continue
|
|
|
|
# Use case-insensitive comparison for deduplication
|
|
folder_name_lower = folder_name.lower()
|
|
if folder_name_lower not in seen_names:
|
|
unique_folders.append(imap_folder)
|
|
seen_names.add(folder_name_lower)
|
|
|
|
# Process each unique folder
|
|
synced_count = 0
|
|
for imap_folder in unique_folders:
|
|
folder_name = imap_folder['name']
|
|
|
|
# Handle nested folder names (convert slashes to underscores or keep as-is)
|
|
# According to requirements, nested folders should be created with slashes in the name
|
|
display_name = folder_name
|
|
|
|
# Check if folder already exists
|
|
existing_folder = Folder.query.filter_by(
|
|
user_id=self.user.id,
|
|
name=display_name
|
|
).first()
|
|
|
|
if not existing_folder:
|
|
# Create new folder
|
|
new_folder = Folder(
|
|
user_id=self.user.id,
|
|
name=display_name,
|
|
rule_text=f"Auto-synced from IMAP folder: {folder_name}",
|
|
priority=0 # Default priority
|
|
)
|
|
db.session.add(new_folder)
|
|
synced_count += 1
|
|
else:
|
|
# Update existing folder with email counts and recent emails
|
|
# Get the total count of emails in this folder
|
|
total_count = self.get_folder_email_count(folder_name)
|
|
existing_folder.total_count = total_count
|
|
existing_folder.pending_count = 0 # Initially set to 0
|
|
|
|
# Get the most recent emails for this folder
|
|
recent_emails = self.get_recent_emails(folder_name, 3)
|
|
existing_folder.recent_emails = recent_emails
|
|
|
|
db.session.commit()
|
|
return True, f"Successfully synced {synced_count} folders"
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
print(e)
|
|
db.session.rollback()
|
|
return False, f"Sync error: {str(e)}"
|