import os from functools import wraps from datetime import datetime, timedelta from flask import Flask, render_template, request, redirect, url_for, session, abort, jsonify, send_file from dotenv import load_dotenv load_dotenv() from filevine_client import FilevineClient from utils import get_user_profile from firebase_init import db from firebase_admin import auth as fb_auth import config import openpyxl from openpyxl import Workbook from openpyxl.utils import get_column_letter app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET_KEY", os.urandom(32)) # --- Helpers --- def login_required(view): @wraps(view) def wrapped(*args, **kwargs): uid = session.get("uid") if not uid: return redirect(url_for("login")) return view(*args, **kwargs) return wrapped def projects_for(profile, case_email_match, per_page, offset): """ Filter projects based on user profile and case_email query string argument. Args: profile (dict): User profile containing 'enabled', 'is_admin', and 'case_email' fields case_email_match (str): Case email from query string argument, or None Returns: list: List of project dictionaries that match the filtering criteria """ is_admin = profile.get("is_admin", False) if not profile.get("enabled"): return ([], 0) # Query Firestore for projects where case_email is in viewing_emails array try: cnt = 0 if is_admin: if case_email_match: projects_ref = db.collection("projects").where("viewing_emails", "array_contains", case_email_match.lower()) # Get filtered document IDs using client-side filtering with partial match z = db.collection("projects").select(["viewing_emails", "matter_description", "id"]) # Get all matching documents with their IDs and descriptions matching_docs = [(x.id, x.to_dict().get('matter_description', '')) for x in z.stream() if any(case_email_match.lower() in email.lower() for email in x.to_dict().get('viewing_emails', []))] count = len(matching_docs) # Sort by matter_description matching_docs.sort(key=lambda x: x[1].lower()) # Extract just the IDs after sorting filtered_ids = [doc_id for doc_id, _ in matching_docs] # Apply client-side pagination filtered_ids = filtered_ids[offset:offset + per_page] print(f"Filtered document IDs (partial match, sorted, paginated): {filtered_ids}") projects_ref = db.collection("projects") projects = [] for doc_id in filtered_ids: doc = projects_ref.document(doc_id).get() if doc.exists: projects.append(doc.to_dict()) return (projects, count) else: projects_ref = db.collection("projects") else: if not profile.get("case_email"): return ([], 0) projects_ref = db.collection("projects").where("viewing_emails", "array_contains", profile.get("case_email").to_lower()) cnt = int(projects_ref.count().get()[0][0].value) projects = [] for doc in projects_ref.order_by("matter_description").limit(per_page).offset(offset).stream(): projects.append(doc.to_dict()) return (projects, cnt) except Exception as e: print(f"[ERROR] Failed to query projects: {e}") return ([], 0) @app.context_processor def inject_user_profile(): """Make user profile available in all templates""" if 'uid' in session: profile = get_user_profile(session['uid']) return {'get_user_profile': lambda uid: profile if uid == session['uid'] else get_user_profile(uid)} return {'get_user_profile': lambda uid: {}} def get_firestore_document(collection_name: str, document_id: str): """ Retrieve a specific document from Firestore. Args: collection_name (str): Name of the Firestore collection document_id (str): ID of the document to retrieve Returns: dict: Document data as dictionary, or None if document doesn't exist """ doc_ref = db.collection(collection_name).document(document_id) doc = doc_ref.get() if doc.exists: return doc.to_dict() else: return None @app.route("/") def index(): uid = session.get("uid") if not uid: return redirect(url_for("login")) profile = get_user_profile(uid) if profile.get("enabled") and profile.get("case_email"): return redirect(url_for("dashboard")) return redirect(url_for("welcome")) @app.route("/login") def login(): # Pass public Firebase config to template fb_public = { "apiKey": os.environ.get("FIREBASE_API_KEY", ""), "authDomain": os.environ.get("FIREBASE_AUTH_DOMAIN", ""), "projectId": os.environ.get("FIREBASE_PROJECT_ID", ""), "appId": os.environ.get("FIREBASE_APP_ID", ""), } return render_template("login.html", firebase_config=fb_public) @app.route("/session_login", methods=["POST"]) def session_login(): """Exchanges Firebase ID token for a server session.""" id_token = request.json.get("idToken") if request.is_json else request.form.get("idToken") if not id_token: abort(400, "Missing idToken") try: decoded = fb_auth.verify_id_token(id_token) uid = decoded["uid"] session.clear() session["uid"] = uid # Optional: short session session["expires_at"] = (datetime.utcnow() + timedelta(hours=8)).isoformat() return jsonify({"ok": True}) except Exception as e: print("[ERR] session_login:", e) abort(401, "Invalid ID token") @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) @app.route("/welcome") @login_required def welcome(): uid = session.get("uid") profile = get_user_profile(uid) return render_template("welcome.html", profile=profile) # --- Filevine API --- # Filevine client is now in filevine_client.py @app.route("/dashboard") @app.route("/dashboard/") @login_required def dashboard(page=1): uid = session.get("uid") profile = get_user_profile(uid) if not profile.get("enabled"): return redirect(url_for("welcome")) is_admin = profile.get("is_admin") # Pagination settings per_page = int(request.args.get('per_page', 25)) offset = (page - 1) * per_page case_email_match = None if is_admin and request.args.get('case_email'): case_email_match = request.args.get('case_email') if not is_admin and not profile.get('case_email'): return redirect(url_for("welcome")) paginated_rows, total_projects = projects_for(profile, case_email_match, per_page, offset) # Calculate pagination total_pages = (total_projects + per_page - 1) // per_page # Ceiling division # Read only the current page from Firestore using limit() and offset() import time print(f"Retrieved {len(paginated_rows)} projects from Firestore") from pprint import pprint pprint([p['property_contacts'] for p in paginated_rows if p['property_contacts'].get('propertyManager1', None)]) pprint([p['ProjectId'] for p in paginated_rows ]) # Render table with pagination data return render_template("dashboard.html", rows=paginated_rows, case_email=case_email_match, current_page=page, total_pages=total_pages, total_projects=total_projects, per_page=per_page) @app.route("/dashboard/export_xls") @login_required def dashboard_export_xls(): """Export all dashboard data to XLS format without pagination.""" uid = session.get("uid") profile = get_user_profile(uid) if not profile.get("enabled"): return redirect(url_for("welcome")) is_admin = profile.get("is_admin") case_email = None if not is_admin and not profile.get('case_email'): return redirect(url_for("welcome")) # Get all projects without pagination try: all_rows, cnt = projects_for(profile, case_email, 10000, 0) # Filter projects where case_email is in viewing_emails array # Order by matter_description to maintain consistent ordering print(f"Retrieved {cnt} projects from Firestore for XLS export") # Create workbook and worksheet wb = Workbook() ws = wb.active ws.title = "Projects" # Define the column headers (matching the dashboard columns) headers = [ 'Matter Num', 'Matter Description', 'Client / Property', 'Defendant 1', 'Matter Open', 'Practice Area', 'Notice Type', 'Case Number', 'Premises Address', 'Premises City', 'Assigned Attorney', 'Primary Contact', 'Secondary Paralegal', 'Documents', 'Matter Stage', 'Completed Tasks', 'Pending Tasks', 'Notice Service Date', 'Notice Expir. Date', 'Date Case Filed', 'Daily Rent Damages', 'Default Date', 'Default Entered On', 'Motions:', 'Demurrer Hearing Date', 'Motion To Strike Hearing Date', 'Motion to Quash Hearing Date', 'Other Motion Hearing Date', 'MSC Date', 'MSC Time', 'MSC Address', 'MSC Div/ Dept/ Room', 'Trial Date', 'Trial Time', 'Trial Address', 'Trial Div/ Dept/ Room', 'Final Result of Trial/ MSC', 'Date of Settlement', 'Final Obligation Under the Stip', 'Def\'s Comply with the Stip?', 'Judgment Date', 'Writ Issued Date', 'Scheduled Lockout', 'Oppose Stays?', 'Premises Safety or Access Issues', 'Matter Gate or Entry Code', 'Date Possession Recovered', 'Attorney\'s Fees', 'Costs' ] # Write headers for col_num, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col_num, value=header) cell.font = openpyxl.styles.Font(bold=True) # Write data rows for row_num, row_data in enumerate(all_rows, 2): for col_num, header in enumerate(headers, 1): # Map header names to data fields field_name = None if header == 'Matter Num': field_name = 'number' elif header == 'Matter Description': field_name = 'matter_description' elif header == 'Client / Property': field_name = 'client' elif header == 'Defendant 1': field_name = 'defendant_1' elif header == 'Matter Open': field_name = 'matter_open' elif header == 'Practice Area': field_name = 'practice_area' # This field doesn't exist in model, so we'll leave blank elif header == 'Notice Type': field_name = 'notice_type' elif header == 'Case Number': field_name = 'case_number' elif header == 'Premises Address': field_name = 'premises_address' elif header == 'Premises City': field_name = 'premises_city' elif header == 'Assigned Attorney': field_name = 'responsible_attorney' elif header == 'Primary Contact': field_name = 'staff_person' elif header == 'Secondary Paralegal': field_name = 'staff_person_2' elif header == 'Documents': field_name = 'documents_url' # Just the URL, not the link elif header == 'Matter Stage': field_name = 'phase_name' elif header == 'Completed Tasks': # Convert list of dicts to string representation if 'completed_tasks' in row_data and row_data['completed_tasks']: task_strings = [] for task in row_data['completed_tasks'][:5]: # Limit to 5 tasks for readability desc = task.get('description', '') completed = task.get('completed', '') task_strings.append(f"{desc} ({completed})") field_value = '; '.join(task_strings) else: field_value = '' ws.cell(row=row_num, column=col_num, value=field_value) continue # Skip to next field elif header == 'Pending Tasks': # Convert list of dicts to string representation if 'pending_tasks' in row_data and row_data['pending_tasks']: task_strings = [] for task in row_data['pending_tasks'][:5]: # Limit to 5 tasks for readability desc = task.get('description', '') task_strings.append(desc) field_value = '; '.join(task_strings) else: field_value = '' ws.cell(row=row_num, column=col_num, value=field_value) continue # Skip to next field elif header == 'Notice Service Date': field_name = 'notice_service_date' elif header == 'Notice Expir. Date': field_name = 'notice_expiration_date' elif header == 'Date Case Filed': field_name = 'case_field_date' elif header == 'Daily Rent Damages': field_name = 'daily_rent_damages' elif header == 'Default Date': field_name = 'default_date' elif header == 'Default Entered On': field_name = 'default_entered_on_date' elif header == 'Motions:': field_name = 'motions' # This field doesn't exist in model, so we'll leave blank elif header == 'Demurrer Hearing Date': field_name = 'demurrer_hearing_date' elif header == 'Motion To Strike Hearing Date': field_name = 'motion_to_strike_hearing_date' elif header == 'Motion to Quash Hearing Date': field_name = 'motion_to_quash_hearing_date' elif header == 'Other Motion Hearing Date': field_name = 'other_motion_hearing_date' elif header == 'MSC Date': field_name = 'msc_date' elif header == 'MSC Time': field_name = 'msc_time' elif header == 'MSC Address': field_name = 'msc_address' elif header == 'MSC Div/ Dept/ Room': field_name = 'msc_div_dept_room' elif header == 'Trial Date': field_name = 'trial_date' elif header == 'Trial Time': field_name = 'trial_time' elif header == 'Trial Address': field_name = 'trial_address' elif header == 'Trial Div/ Dept/ Room': field_name = 'trial_div_dept_room' elif header == 'Final Result of Trial/ MSC': field_name = 'final_result' elif header == 'Date of Settlement': field_name = 'date_of_settlement' elif header == 'Final Obligation Under the Stip': field_name = 'final_obligation' elif header == 'Def\'s Comply with the Stip?': field_name = 'def_comply_stip' elif header == 'Judgment Date': field_name = 'judgment_date' elif header == 'Writ Issued Date': field_name = 'writ_issued_date' elif header == 'Scheduled Lockout': field_name = 'scheduled_lockout' elif header == 'Oppose Stays?': field_name = 'oppose_stays' elif header == 'Premises Safety or Access Issues': field_name = 'premises_safety' elif header == 'Matter Gate or Entry Code': field_name = 'matter_gate_code' elif header == 'Date Possession Recovered': field_name = 'date_possession_recovered' elif header == 'Attorney\'s Fees': field_name = 'attorney_fees' elif header == 'Costs': field_name = 'costs' # Set the cell value if field_name and field_name in row_data: field_value = row_data[field_name] # Handle special cases for formatting if field_name == 'daily_rent_damages' and field_value: # Format as currency if it's a number try: field_value = f"${float(field_value):,.2f}" except: pass # Keep as-is if not a number ws.cell(row=row_num, column=col_num, value=field_value) else: # For fields that don't exist in the model or are special cases, set to empty ws.cell(row=row_num, column=col_num, value='') # Auto-adjust column widths for column in ws.columns: max_length = 0 column_letter = get_column_letter(column[0].column) for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 50) # Cap at 50 characters ws.column_dimensions[column_letter].width = adjusted_width # Save to temporary file and return import tempfile import os temp_dir = tempfile.gettempdir() filename = f"dashboard_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" filepath = os.path.join(temp_dir, filename) wb.save(filepath) return send_file(filepath, as_attachment=True, download_name=filename) except Exception as e: print(f"[ERROR] Failed to export XLS: {e}") return abort(500, "Failed to export data") import admin # Register admin routes admin.register_admin_routes(app) # GAE compatibility if __name__ == "__main__": app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", "5004")))