imap progress
This commit is contained in:
167
app/imap_service.py
Normal file
167
app/imap_service.py
Normal file
@@ -0,0 +1,167 @@
|
||||
import imaplib
|
||||
import ssl
|
||||
import logging
|
||||
from typing import List, Dict, Optional, Tuple
|
||||
from email.header import decode_header
|
||||
from app.models import db, Folder, User
|
||||
from app import create_app
|
||||
|
||||
class IMAPService:
|
||||
def __init__(self, user: User):
|
||||
self.user = user
|
||||
self.config = user.imap_config or {}
|
||||
self.connection = None
|
||||
|
||||
def test_connection(self) -> Tuple[bool, str]:
|
||||
"""Test IMAP connection with current configuration."""
|
||||
try:
|
||||
if not self.config:
|
||||
return False, "No IMAP configuration found"
|
||||
|
||||
# Create SSL context
|
||||
context = ssl.create_default_context()
|
||||
|
||||
# Connect to IMAP server
|
||||
self.connection = imaplib.IMAP4_SSL(
|
||||
self.config.get('server', 'imap.gmail.com'),
|
||||
self.config.get('port', 993)
|
||||
)
|
||||
|
||||
# Login
|
||||
self.connection.login(
|
||||
self.config.get('username', ''),
|
||||
self.config.get('password', '')
|
||||
)
|
||||
|
||||
# Select inbox to verify connection
|
||||
self.connection.select('INBOX')
|
||||
|
||||
# Close connection
|
||||
self.connection.close()
|
||||
self.connection.logout()
|
||||
|
||||
return True, "Connection successful"
|
||||
|
||||
except imaplib.IMAP4.error as e:
|
||||
return False, f"IMAP connection error: {str(e)}"
|
||||
except Exception as e:
|
||||
return False, f"Connection error: {str(e)}"
|
||||
finally:
|
||||
if self.connection:
|
||||
try:
|
||||
self.connection.logout()
|
||||
except:
|
||||
pass
|
||||
|
||||
def get_folders(self) -> List[Dict[str, str]]:
|
||||
"""Get list of folders from IMAP server."""
|
||||
try:
|
||||
if not self.config:
|
||||
return []
|
||||
|
||||
# Create SSL context
|
||||
context = ssl.create_default_context()
|
||||
|
||||
# Connect to IMAP server
|
||||
self.connection = imaplib.IMAP4_SSL(
|
||||
self.config.get('server', 'imap.gmail.com'),
|
||||
self.config.get('port', 993)
|
||||
)
|
||||
|
||||
# Login
|
||||
self.connection.login(
|
||||
self.config.get('username', ''),
|
||||
self.config.get('password', '')
|
||||
)
|
||||
|
||||
# List folders
|
||||
status, folder_data = self.connection.list()
|
||||
|
||||
if status != 'OK':
|
||||
return []
|
||||
|
||||
folders = []
|
||||
for folder_item in folder_data:
|
||||
if isinstance(folder_item, bytes):
|
||||
folder_item = folder_item.decode('utf-8')
|
||||
|
||||
# Parse folder name (handle different IMAP server formats)
|
||||
parts = folder_item.split('"')
|
||||
if len(parts) >= 3:
|
||||
folder_name = parts[-1] if parts[-1] else parts[-2]
|
||||
else:
|
||||
folder_name = folder_item.split()[-1]
|
||||
|
||||
# Handle nested folders (convert to slash notation)
|
||||
if folder_name.startswith('"') and folder_name.endswith('"'):
|
||||
folder_name = folder_name[1:-1]
|
||||
|
||||
folders.append({
|
||||
'name': folder_name,
|
||||
'full_path': folder_name
|
||||
})
|
||||
|
||||
# Close connection
|
||||
self.connection.close()
|
||||
self.connection.logout()
|
||||
|
||||
return folders
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching IMAP folders: {str(e)}")
|
||||
return []
|
||||
finally:
|
||||
if self.connection:
|
||||
try:
|
||||
self.connection.logout()
|
||||
except:
|
||||
pass
|
||||
|
||||
def sync_folders(self) -> Tuple[bool, str]:
|
||||
"""Sync IMAP folders with local database."""
|
||||
try:
|
||||
if not self.config:
|
||||
return False, "No IMAP configuration found"
|
||||
|
||||
# Get folders from IMAP server
|
||||
imap_folders = self.get_folders()
|
||||
|
||||
if not imap_folders:
|
||||
return False, "No folders found on IMAP server"
|
||||
|
||||
# Process each folder
|
||||
synced_count = 0
|
||||
for imap_folder in imap_folders:
|
||||
folder_name = imap_folder['name']
|
||||
|
||||
# Skip special folders that might not be needed
|
||||
if folder_name.lower() in ['inbox', 'sent', 'drafts', 'spam', 'trash']:
|
||||
continue
|
||||
|
||||
# Handle nested folder names (convert slashes to underscores or keep as-is)
|
||||
# According to requirements, nested folders should be created with slashes in the name
|
||||
display_name = folder_name
|
||||
|
||||
# Check if folder already exists
|
||||
existing_folder = Folder.query.filter_by(
|
||||
user_id=self.user.id,
|
||||
name=display_name
|
||||
).first()
|
||||
|
||||
if not existing_folder:
|
||||
# Create new folder
|
||||
new_folder = Folder(
|
||||
user_id=self.user.id,
|
||||
name=display_name,
|
||||
rule_text=f"Auto-synced from IMAP folder: {folder_name}",
|
||||
priority=0 # Default priority
|
||||
)
|
||||
db.session.add(new_folder)
|
||||
synced_count += 1
|
||||
|
||||
db.session.commit()
|
||||
return True, f"Successfully synced {synced_count} folders"
|
||||
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
return False, f"Sync error: {str(e)}"
|
||||
Reference in New Issue
Block a user