import os import json import logging import requests from typing import Dict, List, Optional, Tuple from datetime import datetime, timedelta import hashlib import time class AIService: """AI service layer for email rule generation and quality assessment.""" def __init__(self): self.api_url = os.environ.get('AI_SERVICE_URL', 'https://api.openai.com/v1') self.api_key = os.environ.get('AI_SERVICE_API_KEY') self.model = os.environ.get('AI_MODEL', 'gpt-3.5-turbo') self.timeout = int(os.environ.get('AI_TIMEOUT', 30)) self.max_retries = int(os.environ.get('AI_MAX_RETRIES', 3)) self.cache_ttl = int(os.environ.get('AI_CACHE_TTL', 3600)) # 1 hour # Configure logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def _make_request(self, endpoint: str, payload: Dict, headers: Dict = None) -> Optional[Dict]: """Make HTTP request to AI service with retry logic.""" if not headers: headers = {} headers.update({ 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json' }) url = f"{self.api_url}/{endpoint}" for attempt in range(self.max_retries): try: response = requests.post( url, json=payload, headers=headers, timeout=self.timeout ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: self.logger.warning(f"AI service request failed (attempt {attempt + 1}/{self.max_retries}): {e}") if attempt == self.max_retries - 1: self.logger.error(f"AI service request failed after {self.max_retries} attempts") return None time.sleep(2 ** attempt) # Exponential backoff except Exception as e: self.logger.warning(f"Unexpected error in AI service request (attempt {attempt + 1}/{self.max_retries}): {e}") if attempt == self.max_retries - 1: self.logger.error(f"AI service request failed after {self.max_retries} attempts due to unexpected error") return None time.sleep(2 ** attempt) # Exponential backoff return None def generate_multiple_rules(self, folder_name: str, folder_type: str = 'destination', rule_text:str = '', count: int = 3) -> Tuple[Optional[List[Dict]], Optional[Dict]]: """Generate multiple email organization rule options using AI.""" prompt = self._build_multiple_rules_prompt(folder_name, folder_type, rule_text, count) print("PROMPT", prompt) payload = { 'model': self.model, 'messages': [ {'role': 'system', 'content': 'You are an expert email organizer assistant.'}, {'role': 'user', 'content': prompt} ], 'max_tokens': 400, 'temperature': 0.8 } result = self._make_request('chat/completions', payload) if not result or 'choices' not in result or not result['choices']: return None, {'error': 'No response from AI service'} response_text = result['choices'][0]['message']['content'].strip() print(f"RESPONSE WAS '{response_text}'") rules = self._parse_multiple_rules_response(response_text) if not rules: return None, {'error': 'Failed to parse AI response'} # Assess quality for each rule scored_rules = [] for rule in rules: quality_score = self._assess_rule_quality(rule['text'], folder_name, folder_type) scored_rules.append({ 'text': rule['text'], 'quality_score': quality_score, 'key_criteria': rule.get('criteria', ''), 'model_used': self.model, 'generated_at': datetime.utcnow().isoformat() }) return scored_rules, { 'total_generated': len(scored_rules), 'model_used': self.model, 'generated_at': datetime.utcnow().isoformat() } def assess_rule_quality(self, rule_text: str, folder_name: str, folder_type: str = 'destination') -> Dict: """Assess the quality of an email organization rule.""" score = self._assess_rule_quality(rule_text, folder_name, folder_type) return { 'score': score, 'grade': self._get_quality_grade(score), 'feedback': self._generate_quality_feedback(rule_text, folder_name, score), 'assessed_at': datetime.utcnow().isoformat() } def _build_single_rule_prompt(self, folder_name: str, folder_type: str, rule_text: str) -> str: """Build prompt for single rule generation.""" return f""" Generate a single, effective email organization rule for a folder named "{folder_name}". This folder is of type "{folder_type}". The current rule text is "{rule_text}". You can choose to enhance this. Requirements: 1. The rule should be specific and actionable 2. Use natural language that can be easily understood 3. Focus on common email patterns that would benefit from organization 4. Keep it concise (under 150 characters) 5. Make it relevant to the folder name and purpose 6. Rules should follow the structure: Bulleted list (separated by new line) of * (content) belongs in this folder. * (content) DOES NOT belong in this folder Return only the rule text, nothing else. """ def _build_multiple_rules_prompt(self, folder_name: str, folder_type: str, rule_text: str, count: int) -> str: """Build prompt for multiple rule generation.""" return f""" Generate {count} different email organization rule options for a folder named "{folder_name}". This folder is of type "{folder_type}". The current rule text is "{rule_text}". If there is content in this, your options must respect the existing content. Requirements: 1. Each rule should be specific and actionable 2. Use natural language that can be easily understood 3. Focus on different aspects of email organization for this folder 4. Keep each rule concise (under 150 characters) 5. Make rules relevant to the folder name and purpose 6. Provide variety in rule approaches 7. A single rule option should should follow the structure: Bulleted list (separated by new line) of * (content) belongs in this folder. * (content) DOES NOT belong in this folder Return the rules in JSON format: {{ "rules": [ {{ "text": "rule text here, as a bulleted list separated by newlines", "criteria": "brief explanation of what this rule targets" }}, ... ] }} 7. DO NOT use markdown at all. Respond with JSON. 8. Rules should follow the structure: Bulleted list (separated by newlines ,\\n) of * (content) belongs in this folder. * (content) DOES NOT belong in this folder """ def _parse_multiple_rules_response(self, response_text: str) -> List[Dict]: """Parse multiple rules response from AI.""" try: # Try to parse as JSON first data = json.loads(response_text) print(f"DATAA WAS {data}") if 'rules' in data and isinstance(data['rules'], list): return data['rules'] # If JSON parsing fails, try to extract rules manually rules = [] lines = response_text.split('\n') current_rule = {} for line in lines: line = line.strip() if line.startswith('"text":') or line.startswith('"rule":'): if current_rule: rules.append(current_rule) current_rule = {'text': line.split(':', 1)[1].strip().strip('"')} elif line.startswith('"criteria":') and current_rule: current_rule['criteria'] = line.split(':', 1)[1].strip().strip('"') if current_rule: rules.append(current_rule) return rules[:5] # Return max 5 rules except json.JSONDecodeError: self.logger.warning("Failed to parse AI response as JSON, attempting manual parsing") return [] def _assess_rule_quality(self, rule_text: str, folder_name: str, folder_type: str) -> int: """Assess rule quality and return score 0-100.""" if not rule_text or len(rule_text.strip()) < 10: return 0 score = 50 # Base score # Length check (optimal: 20-100 characters) rule_length = len(rule_text.strip()) if 20 <= rule_length <= 100: score += 20 elif 10 <= rule_length < 20 or 100 < rule_length <= 150: score += 10 # Specificity check specific_keywords = ['from', 'subject', 'contains', 'sender', 'domain', 'email'] has_specific_keyword = any(keyword in rule_text.lower() for keyword in specific_keywords) if has_specific_keyword: score += 20 # Action-oriented check action_words = ['move', 'filter', 'organize', 'sort', 'categorize', 'send', 'redirect'] has_action_word = any(word in rule_text.lower() for word in action_words) if has_action_word: score += 15 # Relevance to folder name folder_words = folder_name.lower().split() folder_relevance = sum(1 for word in folder_words if word in rule_text.lower()) if folder_relevance > 0: score += 15 # Grammar and structure check if '.' not in rule_text and '?' not in rule_text and '!' not in rule_text: score += 10 # Simple, clean structure # Check for common rule patterns common_patterns = [ r'from:.*@.*\..*', r'subject:.*', r'contains:.*', r'if.*then.*' ] import re for pattern in common_patterns: if re.search(pattern, rule_text, re.IGNORECASE): score += 10 break return min(score, 100) # Cap at 100 def _get_quality_grade(self, score: int) -> str: """Get quality grade based on score.""" if score >= 80: return 'excellent' elif score >= 60: return 'good' elif score >= 40: return 'fair' else: return 'poor' def _generate_quality_feedback(self, rule_text: str, folder_name: str, score: int) -> str: """Generate quality feedback based on rule assessment.""" feedback = [] if score >= 80: feedback.append("Excellent rule! It's specific, actionable, and well-structured.") elif score >= 60: feedback.append("Good rule with room for improvement.") elif score >= 40: feedback.append("Fair rule. Consider making it more specific.") else: feedback.append("Poor rule. Needs significant improvement.") # Add specific feedback if len(rule_text.strip()) < 20: feedback.append("Rule is too short. Add more specific criteria.") elif len(rule_text.strip()) > 100: feedback.append("Rule is too long. Be more concise.") if not any(word in rule_text.lower() for word in ['from', 'subject', 'contains']): feedback.append("Consider adding specific criteria like 'from:' or 'subject:'.") if not any(word in rule_text.lower() for word in ['move', 'filter', 'organize']): feedback.append("Make sure the rule includes an action word.") return " ".join(feedback) @staticmethod def generate_cache_key(folder_name: str, folder_type: str, rule_type: str, raw_text: str = '') -> str: """Generate a cache key for AI rule requests.""" key_string = f"{folder_name}:{folder_type}:{rule_type}:{raw_text}" return hashlib.md5(key_string.encode()).hexdigest() def get_fallback_rule(self, folder_name: str, folder_type: str = 'destination') -> str: """Generate a fallback rule when AI service is unavailable.""" fallback_rules = { 'destination': [ f"Move emails containing '{folder_name}' in the subject to this folder", f"Filter emails from senders with '{folder_name}' in their domain", f"Organize emails with '{folder_name}' keywords in the body" ], 'tidy': [ f"Move emails older than 30 days to this folder", f"Archive processed emails from '{folder_name}'", f"Sort completed emails by date" ], 'ignore': [ f"Ignore emails containing '{folder_name}'", f"Exclude emails from '{folder_name}' senders", f"Skip emails with '{folder_name}' in subject" ] } rules = fallback_rules.get(folder_type, fallback_rules['destination']) return rules[0] if rules else f"Move emails related to '{folder_name}' to this folder"