Files
rothbard/app.py
2025-12-17 16:14:02 -08:00

498 lines
20 KiB
Python

import os
from functools import wraps
from datetime import datetime, timedelta
from flask import Flask, render_template, request, redirect, url_for, session, abort, jsonify, send_file
from dotenv import load_dotenv
load_dotenv()
from filevine_client import FilevineClient
from utils import get_user_profile
from firebase_init import db
from firebase_admin import auth as fb_auth
import config
import openpyxl
from openpyxl import Workbook
from openpyxl.utils import get_column_letter
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", os.urandom(32))
# --- Helpers ---
def login_required(view):
@wraps(view)
def wrapped(*args, **kwargs):
uid = session.get("uid")
if not uid:
return redirect(url_for("login"))
return view(*args, **kwargs)
return wrapped
def projects_for(profile, case_email_match, per_page, offset):
"""
Filter projects based on user profile and case_email query string argument.
Args:
profile (dict): User profile containing 'enabled', 'is_admin', 'case_email', and 'case_domain_email' fields
case_email_match (str): Case email from query string argument, or None
Returns:
list: List of project dictionaries that match the filtering criteria
"""
is_admin = profile.get("is_admin", False)
if not profile.get("enabled"):
return ([], 0)
# Query Firestore for projects where case_email is in viewing_emails array
try:
cnt = 0
if is_admin:
if case_email_match:
case_email_match_lower = case_email_match.lower().strip()
# Check if case_email_match is a valid email address (contains @)
if '@' in case_email_match_lower and not case_email_match_lower.startswith('@'):
# If it's a complete email address, filter by exact match in viewing_emails
projects_ref = db.collection("projects").where("viewing_emails", "array_contains", case_email_match_lower)
cnt = int(projects_ref.count().get()[0][0].value)
projects = []
for doc in projects_ref.order_by("matter_description").limit(per_page).offset(offset).stream():
projects.append(doc.to_dict())
return (projects, cnt)
else:
# If no @ sign, treat as domain search
# Also handle cases like "@gmail.com" by extracting the domain
domain_search = case_email_match_lower
if domain_search.startswith('@'):
domain_search = domain_search[1:] # Remove the @ sign
# Filter by domain match in viewing_emails
projects_ref = db.collection("projects").where("viewing_domains", "array_contains", domain_search)
print("HERE domain", domain_search)
cnt = int(projects_ref.count().get()[0][0].value)
projects = []
for doc in projects_ref.order_by("matter_description").limit(per_page).offset(offset).stream():
projects.append(doc.to_dict())
return (projects, cnt)
else:
projects_ref = db.collection("projects")
else:
# For non-admin users, check if they have domain email or specific case email
case_domain_email = profile.get("case_domain_email", "")
case_email = profile.get("case_email", "")
if case_domain_email:
# Use exact match on viewing_domains field
domain_lower = case_domain_email.lower()
projects_ref = db.collection("projects").where("viewing_domains", "array_contains", domain_lower)
elif case_email:
# Use the original logic for specific case email match
projects_ref = db.collection("projects").where("viewing_emails", "array_contains", case_email.lower())
else:
return ([], 0)
cnt = int(projects_ref.count().get()[0][0].value)
projects = []
for doc in projects_ref.order_by("matter_description").limit(per_page).offset(offset).stream():
projects.append(doc.to_dict())
return (projects, cnt)
except Exception as e:
print(f"[ERROR] Failed to query projects: {e}")
return ([], 0)
@app.context_processor
def inject_user_profile():
"""Make user profile available in all templates"""
if 'uid' in session:
profile = get_user_profile(session['uid'])
return {'get_user_profile': lambda uid: profile if uid == session['uid'] else get_user_profile(uid)}
return {'get_user_profile': lambda uid: {}}
def get_firestore_document(collection_name: str, document_id: str):
"""
Retrieve a specific document from Firestore.
Args:
collection_name (str): Name of the Firestore collection
document_id (str): ID of the document to retrieve
Returns:
dict: Document data as dictionary, or None if document doesn't exist
"""
doc_ref = db.collection(collection_name).document(document_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()
else:
return None
@app.route("/")
def index():
uid = session.get("uid")
if not uid:
return redirect(url_for("login"))
profile = get_user_profile(uid)
if profile.get("enabled") and (profile.get("case_email") or profile.get("case_domain_email")):
return redirect(url_for("dashboard"))
return redirect(url_for("welcome"))
@app.route("/login")
def login():
# Pass public Firebase config to template
fb_public = {
"apiKey": os.environ.get("FIREBASE_API_KEY", ""),
"authDomain": os.environ.get("FIREBASE_AUTH_DOMAIN", ""),
"projectId": os.environ.get("FIREBASE_PROJECT_ID", ""),
"appId": os.environ.get("FIREBASE_APP_ID", ""),
}
return render_template("login.html", firebase_config=fb_public)
@app.route("/session_login", methods=["POST"])
def session_login():
"""Exchanges Firebase ID token for a server session."""
id_token = request.json.get("idToken") if request.is_json else request.form.get("idToken")
if not id_token:
abort(400, "Missing idToken")
try:
decoded = fb_auth.verify_id_token(id_token)
uid = decoded["uid"]
session.clear()
session["uid"] = uid
# Optional: short session
session["expires_at"] = (datetime.utcnow() + timedelta(hours=8)).isoformat()
return jsonify({"ok": True})
except Exception as e:
print("[ERR] session_login:", e)
abort(401, "Invalid ID token")
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/welcome")
@login_required
def welcome():
uid = session.get("uid")
profile = get_user_profile(uid)
return render_template("welcome.html", profile=profile)
# --- Filevine API ---
# Filevine client is now in filevine_client.py
@app.route("/dashboard")
@app.route("/dashboard/<int:page>")
@login_required
def dashboard(page=1):
uid = session.get("uid")
profile = get_user_profile(uid)
if not profile.get("enabled"):
return redirect(url_for("welcome"))
is_admin = profile.get("is_admin")
# Pagination settings
per_page = int(request.args.get('per_page', 25))
offset = (page - 1) * per_page
case_email_match = None
if is_admin and request.args.get('case_email'):
case_email_match = request.args.get('case_email')
if not is_admin and (not profile.get('case_email') and not profile.get('case_domain_email')):
return redirect(url_for("welcome"))
paginated_rows, total_projects = projects_for(profile, case_email_match, per_page, offset)
# Calculate pagination
total_pages = (total_projects + per_page - 1) // per_page # Ceiling division
# Read only the current page from Firestore using limit() and offset()
import time
print(f"Retrieved {len(paginated_rows)} projects from Firestore")
from pprint import pprint
pprint([p['property_contacts'] for p in paginated_rows if p['property_contacts'].get('propertyManager1', None)])
pprint([p['ProjectId'] for p in paginated_rows ])
# Render table with pagination data
return render_template("dashboard.html",
rows=paginated_rows,
case_email=case_email_match,
current_page=page,
total_pages=total_pages,
total_projects=total_projects,
per_page=per_page)
@app.route("/dashboard/export_xls")
@login_required
def dashboard_export_xls():
"""Export all dashboard data to XLS format without pagination."""
uid = session.get("uid")
profile = get_user_profile(uid)
if not profile.get("enabled"):
return redirect(url_for("welcome"))
is_admin = profile.get("is_admin")
case_email = None
if not is_admin and (not profile.get('case_email') and not profile.get('case_domain_email')):
return redirect(url_for("welcome"))
# Get all projects without pagination
try:
all_rows, cnt = projects_for(profile, case_email, 10000, 0)
# Filter projects where case_email is in viewing_emails array
# Order by matter_description to maintain consistent ordering
print(f"Retrieved {cnt} projects from Firestore for XLS export")
# Create workbook and worksheet
wb = Workbook()
ws = wb.active
ws.title = "Projects"
# Define the column headers (matching the dashboard columns)
headers = [
'Matter Num',
'Matter Description',
'Client / Property',
'Defendant 1',
'Matter Open',
'Practice Area',
'Notice Type',
'Case Number',
'Premises Address',
'Premises City',
'Assigned Attorney',
'Primary Contact',
'Secondary Paralegal',
'Documents',
'Matter Stage',
'Completed Tasks',
'Pending Tasks',
'Notice Service Date',
'Notice Expir. Date',
'Date Case Filed',
'Daily Rent Damages',
'Default Date',
'Default Entered On',
'Motions:',
'Demurrer Hearing Date',
'Motion To Strike Hearing Date',
'Motion to Quash Hearing Date',
'Other Motion Hearing Date',
'MSC Date',
'MSC Time',
'MSC Address',
'MSC Div/ Dept/ Room',
'Trial Date',
'Trial Time',
'Trial Address',
'Trial Div/ Dept/ Room',
'Final Result of Trial/ MSC',
'Date of Settlement',
'Final Obligation Under the Stip',
'Def\'s Comply with the Stip?',
'Judgment Date',
'Writ Issued Date',
'Scheduled Lockout',
'Oppose Stays?',
'Premises Safety or Access Issues',
'Matter Gate or Entry Code',
'Date Possession Recovered',
'Attorney\'s Fees',
'Costs'
]
# Write headers
for col_num, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col_num, value=header)
cell.font = openpyxl.styles.Font(bold=True)
# Write data rows
for row_num, row_data in enumerate(all_rows, 2):
for col_num, header in enumerate(headers, 1):
# Map header names to data fields
field_name = None
if header == 'Matter Num':
field_name = 'number'
elif header == 'Matter Description':
field_name = 'matter_description'
elif header == 'Client / Property':
field_name = 'client'
elif header == 'Defendant 1':
field_name = 'defendant_1'
elif header == 'Matter Open':
field_name = 'matter_open'
elif header == 'Practice Area':
field_name = 'practice_area' # This field doesn't exist in model, so we'll leave blank
elif header == 'Notice Type':
field_name = 'notice_type'
elif header == 'Case Number':
field_name = 'case_number'
elif header == 'Premises Address':
field_name = 'premises_address'
elif header == 'Premises City':
field_name = 'premises_city'
elif header == 'Assigned Attorney':
field_name = 'responsible_attorney'
elif header == 'Primary Contact':
field_name = 'staff_person'
elif header == 'Secondary Paralegal':
field_name = 'staff_person_2'
elif header == 'Documents':
field_name = 'documents_url' # Just the URL, not the link
elif header == 'Matter Stage':
field_name = 'phase_name'
elif header == 'Completed Tasks':
# Convert list of dicts to string representation
if 'completed_tasks' in row_data and row_data['completed_tasks']:
task_strings = []
for task in row_data['completed_tasks'][:5]: # Limit to 5 tasks for readability
desc = task.get('description', '')
completed = task.get('completed', '')
task_strings.append(f"{desc} ({completed})")
field_value = '; '.join(task_strings)
else:
field_value = ''
ws.cell(row=row_num, column=col_num, value=field_value)
continue # Skip to next field
elif header == 'Pending Tasks':
# Convert list of dicts to string representation
if 'pending_tasks' in row_data and row_data['pending_tasks']:
task_strings = []
for task in row_data['pending_tasks'][:5]: # Limit to 5 tasks for readability
desc = task.get('description', '')
task_strings.append(desc)
field_value = '; '.join(task_strings)
else:
field_value = ''
ws.cell(row=row_num, column=col_num, value=field_value)
continue # Skip to next field
elif header == 'Notice Service Date':
field_name = 'notice_service_date'
elif header == 'Notice Expir. Date':
field_name = 'notice_expiration_date'
elif header == 'Date Case Filed':
field_name = 'case_field_date'
elif header == 'Daily Rent Damages':
field_name = 'daily_rent_damages'
elif header == 'Default Date':
field_name = 'default_date'
elif header == 'Default Entered On':
field_name = 'default_entered_on_date'
elif header == 'Motions:':
field_name = 'motions' # This field doesn't exist in model, so we'll leave blank
elif header == 'Demurrer Hearing Date':
field_name = 'demurrer_hearing_date'
elif header == 'Motion To Strike Hearing Date':
field_name = 'motion_to_strike_hearing_date'
elif header == 'Motion to Quash Hearing Date':
field_name = 'motion_to_quash_hearing_date'
elif header == 'Other Motion Hearing Date':
field_name = 'other_motion_hearing_date'
elif header == 'MSC Date':
field_name = 'msc_date'
elif header == 'MSC Time':
field_name = 'msc_time'
elif header == 'MSC Address':
field_name = 'msc_address'
elif header == 'MSC Div/ Dept/ Room':
field_name = 'msc_div_dept_room'
elif header == 'Trial Date':
field_name = 'trial_date'
elif header == 'Trial Time':
field_name = 'trial_time'
elif header == 'Trial Address':
field_name = 'trial_address'
elif header == 'Trial Div/ Dept/ Room':
field_name = 'trial_div_dept_room'
elif header == 'Final Result of Trial/ MSC':
field_name = 'final_result'
elif header == 'Date of Settlement':
field_name = 'date_of_settlement'
elif header == 'Final Obligation Under the Stip':
field_name = 'final_obligation'
elif header == 'Def\'s Comply with the Stip?':
field_name = 'def_comply_stip'
elif header == 'Judgment Date':
field_name = 'judgment_date'
elif header == 'Writ Issued Date':
field_name = 'writ_issued_date'
elif header == 'Scheduled Lockout':
field_name = 'scheduled_lockout'
elif header == 'Oppose Stays?':
field_name = 'oppose_stays'
elif header == 'Premises Safety or Access Issues':
field_name = 'premises_safety'
elif header == 'Matter Gate or Entry Code':
field_name = 'matter_gate_code'
elif header == 'Date Possession Recovered':
field_name = 'date_possession_recovered'
elif header == 'Attorney\'s Fees':
field_name = 'attorney_fees'
elif header == 'Costs':
field_name = 'costs'
# Set the cell value
if field_name and field_name in row_data:
field_value = row_data[field_name]
# Handle special cases for formatting
if field_name == 'daily_rent_damages' and field_value:
# Format as currency if it's a number
try:
field_value = f"${float(field_value):,.2f}"
except:
pass # Keep as-is if not a number
ws.cell(row=row_num, column=col_num, value=field_value)
else:
# For fields that don't exist in the model or are special cases, set to empty
ws.cell(row=row_num, column=col_num, value='')
# Auto-adjust column widths
for column in ws.columns:
max_length = 0
column_letter = get_column_letter(column[0].column)
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50) # Cap at 50 characters
ws.column_dimensions[column_letter].width = adjusted_width
# Save to temporary file and return
import tempfile
import os
temp_dir = tempfile.gettempdir()
filename = f"dashboard_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
filepath = os.path.join(temp_dir, filename)
wb.save(filepath)
return send_file(filepath, as_attachment=True, download_name=filename)
except Exception as e:
print(f"[ERROR] Failed to export XLS: {e}")
return abort(500, "Failed to export data")
import admin
# Register admin routes
admin.register_admin_routes(app)
# GAE compatibility
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", "5004")))