#!/usr/bin/env python3 """ CLI script to query Firebase for projects associated with a user email. Usage: python scripts/query_projects.py --email user@example.com python scripts/query_projects.py --email user@example.com --limit 10 python scripts/query_projects.py --email @gmail.com # Domain search """ import argparse import json import sys from typing import List, Dict, Any # Add parent directory to path for imports sys.path.insert(0, "..") from dotenv import load_dotenv load_dotenv() from firebase_init import db from firebase_admin import auth as fb_auth from utils import get_user_profile def get_uid_from_email(email: str) -> str | None: """Get Firebase Auth UID from user email.""" try: user = fb_auth.get_user_by_email(email) return user.uid except fb_auth.UserNotFound: return None except Exception as e: print(f"Error looking up user: {e}") return None def query_projects_for_user( uid: str, case_email: str | None = None, case_domain_email: str | None = None, limit: int = 100 ) -> tuple[List[Dict[str, Any]], int]: """ Query Firestore for projects associated with a user. Args: uid: Firebase user UID case_email: Specific case email to filter by (optional) case_domain_email: Domain to filter by (optional) limit: Maximum number of projects to return Returns: Tuple of (projects list, total count) """ profile = get_user_profile(uid) if not profile.get("enabled"): print(f"Warning: User is not enabled") # Determine which filter to use filter_email = case_email or profile.get("case_email") filter_domain = case_domain_email or profile.get("case_domain_email") if not filter_email and not filter_domain: print("Error: No case_email or case_domain_email configured for this user") return ([], 0) try: if filter_domain: # Domain-based search domain_lower = filter_domain.lower() projects_ref = db.collection("projects").where( "viewing_domains", "array_contains", domain_lower ) else: # Email-based search email_lower = filter_email.lower() projects_ref = db.collection("projects").where( "viewing_emails", "array_contains", email_lower ) # Get total count total_count = int(projects_ref.count().get()[0][0].value) # Get paginated results projects = [] for doc in projects_ref.order_by("matter_description").limit(limit).stream(): projects.append(doc.to_dict()) return (projects, total_count) except Exception as e: print(f"Error querying projects: {e}") return ([], 0) def format_project(project: Dict[str, Any]) -> str: """Format a project dictionary for display.""" lines = [ f"Matter #: {project.get('number', 'N/A')}", f"Description: {project.get('matter_description', 'N/A')}", f"Client: {project.get('client', 'N/A')}", f"Defendant 1: {project.get('defendant_1', 'N/A')}", f"Case #: {project.get('case_number', 'N/A')}", f"Premises: {project.get('premises_address', 'N/A')}, {project.get('premises_city', 'N/A')}", f"Assigned Attorney: {project.get('responsible_attorney', 'N/A')}", f"Primary Contact: {project.get('staff_person', 'N/A')}", f"Matter Stage: {project.get('phase_name', 'N/A')}", f"Documents: {project.get('documents_url', 'N/A')}", ] return "\n".join(lines) def main(): parser = argparse.ArgumentParser( description="Query Firebase for projects associated with a user email" ) parser.add_argument( "--email", "-e", required=True, help="User email address (e.g., user@example.com or @domain.com for domain search)" ) parser.add_argument( "--limit", "-l", type=int, default=100, help="Maximum number of projects to return (default: 100)" ) parser.add_argument( "--json", "-j", action="store_true", help="Output results as JSON" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show detailed project information" ) parser.add_argument( "--case-email", help="Override case email filter (use user's case_email by default)" ) parser.add_argument( "--case-domain", help="Override case domain filter (use user's case_domain_email by default)" ) args = parser.parse_args() # Get UID from email uid = get_uid_from_email(args.email) if not uid: print(f"Error: User not found with email '{args.email}'") sys.exit(1) # Get user profile profile = get_user_profile(uid) print(f"\nUser: {profile.get('user_email', 'N/A')}") print(f"UID: {uid}") print(f"Enabled: {profile.get('enabled', False)}") print(f"Is Admin: {profile.get('is_admin', False)}") print(f"Case Email: {profile.get('case_email', 'N/A')}") print(f"Case Domain Email: {profile.get('case_domain_email', 'N/A')}") # Query projects projects, total_count = query_projects_for_user( uid=uid, case_email=args.case_email, case_domain_email=args.case_domain, limit=args.limit ) print(f"\nFound {total_count} projects (showing {min(len(projects), args.limit)})") if args.json: # Output as JSON result = { "user": { "email": profile.get("user_email"), "uid": uid, "enabled": profile.get("enabled"), "is_admin": profile.get("is_admin"), "case_email": profile.get("case_email"), "case_domain_email": profile.get("case_domain_email") }, "total_count": total_count, "projects": projects } print(json.dumps(result, indent=2, default=str)) else: # Human-readable output for i, project in enumerate(projects, 1): print(f"\n{'='*60}") print(f"Project {i}:") print('='*60) if args.verbose: print(format_project(project)) else: print(f" Matter #: {project.get('number', 'N/A')}") print(f" Description: {project.get('matter_description', 'N/A')}") print(f" Client: {project.get('client', 'N/A')}") print(f" Case #: {project.get('case_number', 'N/A')}") if __name__ == "__main__": main()