Files
rothbard/app.py
2025-11-09 20:44:26 -08:00

398 lines
13 KiB
Python

import json
import os
from functools import wraps
from datetime import datetime, timedelta
import pytz
from flask import Flask, render_template, request, redirect, url_for, session, abort, jsonify
from dotenv import load_dotenv
load_dotenv()
import firebase_admin
from firebase_admin import credentials, auth as fb_auth, firestore
import requests
from filevine_client import FilevineClient
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", os.urandom(32))
# --- Firebase Admin init ---
_creds = None
json_inline = os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON")
file_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
if json_inline:
_creds = credentials.Certificate(json.loads(json_inline))
elif file_path and os.path.exists(file_path):
_creds = credentials.Certificate(file_path)
else:
raise RuntimeError("Firebase credentials not configured. Set GOOGLE_APPLICATION_CREDENTIALS or FIREBASE_SERVICE_ACCOUNT_JSON.")
firebase_admin.initialize_app(_creds)
db = firestore.client()
# --- Filevine env ---
FV_CLIENT_ID = os.environ.get("FILEVINE_CLIENT_ID")
FV_CLIENT_SECRET = os.environ.get("FILEVINE_CLIENT_SECRET")
FV_PAT = os.environ.get("FILEVINE_PERSONAL_ACCESS_TOKEN")
FV_ORG_ID = os.environ.get("FILEVINE_ORG_ID")
FV_USER_ID = os.environ.get("FILEVINE_USER_ID")
if not all([FV_CLIENT_ID, FV_CLIENT_SECRET, FV_PAT, FV_ORG_ID, FV_USER_ID]):
print("[WARN] Missing one or more Filevine env vars — dashboard will fail until set.")
# --- Cache ---
# No longer using cache - projects are stored in Firestore
PHASES = {
209436: "Nonpayment File Review",
209437: "Attorney File Review",
209438: "Notice Preparation",
209439: "Notice Pending",
209440: "Notice Expired",
209442: "Preparing and Filing UD",
209443: "Waiting for Answer",
209444: "Archived",
210761: "Service of Process",
211435: "Default",
211436: "Pre-Answer Motion",
211437: "Request for Trial",
211438: "Trial Prep and Trial",
211439: "Writ and Sheriff",
211440: "Lockout Pending",
211441: "Stipulation Preparation",
211442: "Stipulation Pending",
211443: "Stipulation Expired",
211446: "On Hold",
211466: "Request for Monetary Judgment",
211467: "Appeals and Post-Poss. Motions",
211957: "Migrated",
213691: "Close Out/ Invoicing",
213774: "Judgment After Stip & Order",
}
# --- Helpers ---
def login_required(view):
@wraps(view)
def wrapped(*args, **kwargs):
uid = session.get("uid")
if not uid:
return redirect(url_for("login"))
return view(*args, **kwargs)
return wrapped
def get_user_profile(uid: str):
"""Fetch user's Firestore profile: users/{uid} => { enabled, case_email, is_admin, user_email }"""
doc_ref = db.collection("users").document(uid)
snap = doc_ref.get()
if not snap.exists:
# bootstrap a placeholder doc so admins can fill it in
# Get user email from Firebase Auth
try:
user = fb_auth.get_user(uid)
user_email = user.email
except Exception:
user_email = None
doc_ref.set({
"enabled": False,
"is_admin": False,
"user_email": user_email,
"case_email": user_email
}, merge=True)
return {
"enabled": False,
"is_admin": False,
"user_email": user_email,
"case_email": user_email
}
data = snap.to_dict() or {}
return {
"enabled": bool(data.get("enabled", False)),
"is_admin": bool(data.get("is_admin", False)),
"user_email": data.get("user_email"),
"case_email": data.get("case_email")
}
@app.context_processor
def inject_user_profile():
"""Make user profile available in all templates"""
if 'uid' in session:
profile = get_user_profile(session['uid'])
return {'get_user_profile': lambda uid: profile if uid == session['uid'] else get_user_profile(uid)}
return {'get_user_profile': lambda uid: {}}
def get_firestore_document(collection_name: str, document_id: str):
"""
Retrieve a specific document from Firestore.
Args:
collection_name (str): Name of the Firestore collection
document_id (str): ID of the document to retrieve
Returns:
dict: Document data as dictionary, or None if document doesn't exist
"""
doc_ref = db.collection(collection_name).document(document_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()
else:
return None
@app.route("/")
def index():
uid = session.get("uid")
if not uid:
return redirect(url_for("login"))
profile = get_user_profile(uid)
if profile.get("enabled") and profile.get("case_email"):
return redirect(url_for("dashboard"))
return redirect(url_for("welcome"))
@app.route("/login")
def login():
# Pass public Firebase config to template
fb_public = {
"apiKey": os.environ.get("FIREBASE_API_KEY", ""),
"authDomain": os.environ.get("FIREBASE_AUTH_DOMAIN", ""),
"projectId": os.environ.get("FIREBASE_PROJECT_ID", ""),
"appId": os.environ.get("FIREBASE_APP_ID", ""),
}
return render_template("login.html", firebase_config=fb_public)
@app.route("/session_login", methods=["POST"])
def session_login():
"""Exchanges Firebase ID token for a server session."""
id_token = request.json.get("idToken") if request.is_json else request.form.get("idToken")
if not id_token:
abort(400, "Missing idToken")
try:
decoded = fb_auth.verify_id_token(id_token)
uid = decoded["uid"]
session.clear()
session["uid"] = uid
# Optional: short session
session["expires_at"] = (datetime.utcnow() + timedelta(hours=8)).isoformat()
return jsonify({"ok": True})
except Exception as e:
print("[ERR] session_login:", e)
abort(401, "Invalid ID token")
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/welcome")
@login_required
def welcome():
uid = session.get("uid")
profile = get_user_profile(uid)
return render_template("welcome.html", profile=profile)
# --- Filevine API ---
# Filevine client is now in filevine_client.py
@app.route("/dashboard")
@app.route("/dashboard/<int:page>")
@login_required
def dashboard(page=1):
uid = session.get("uid")
profile = get_user_profile(uid)
if not profile.get("enabled"):
return redirect(url_for("welcome"))
is_admin = profile.get("is_admin")
case_email = None
if not is_admin:
case_email = profile.get("case_email")
if not case_email:
return redirect(url_for("welcome"))
if is_admin and request.args.get('case_email'):
case_email = request.args.get('case_email').lower()
# Validate email format
if '@' not in case_email:
return abort(400, "Invalid email format")
# Pagination settings
per_page = 25
offset = (page - 1) * per_page
query = None
# Get total count efficiently using a count aggregation query
try:
# Firestore doesn't have a direct count() method, so we need to count documents
import time
start_time = time.time()
projects_ref = db.collection("projects")
# Filter projects where case_email is in viewing_emails array
if case_email:
query = projects_ref.where("viewing_emails", "array_contains", case_email.lower())
else:
query = projects_ref
total_projects = int(query.count().get()[0][0].value)
end_time = time.time()
print(f"Filtered projects count: {total_projects} (took {end_time - start_time:.2f}s)")
except Exception as e:
print(f"[WARN] Failed to get filtered count: {e}")
total_projects = 0
# Calculate pagination
total_pages = (total_projects + per_page - 1) // per_page # Ceiling division
# Read only the current page from Firestore using limit() and offset()
import time
start_time = time.time()
# Filter projects where case_email is in viewing_emails array
if case_email:
projects_ref = db.collection("projects").where("viewing_emails", "array_contains", case_email.lower()).order_by("matter_description").limit(per_page).offset(offset)
else:
projects_ref = db.collection("projects").order_by("matter_description").limit(per_page).offset(offset)
docs = projects_ref.stream()
paginated_rows = []
for doc in docs:
paginated_rows.append(doc.to_dict())
end_time = time.time()
print(f"Retrieved {len(paginated_rows)} projects from Firestore (page {page} of {total_pages}) in {end_time - start_time:.2f}s")
from pprint import pprint
pprint([p['property_contacts'] for p in paginated_rows if p['property_contacts'].get('propertyManager1', None)])
pprint([p['ProjectId'] for p in paginated_rows ])
# Render table with pagination data
return render_template("dashboard.html",
rows=paginated_rows,
case_email=case_email,
current_page=page,
total_pages=total_pages,
total_projects=total_projects,
per_page=per_page)
@app.route("/admin/users")
@app.route("/admin/users/<int:page>")
@login_required
def admin_users(page=1):
"""Admin page to manage all users"""
uid = session.get("uid")
profile = get_user_profile(uid)
# Only admins can access this page
if not profile.get("is_admin"):
abort(403, "Access denied. Admin privileges required.")
# Pagination settings
per_page = 25
offset = (page - 1) * per_page
# Get all users from Firestore
try:
# Get total count of users
users_ref = db.collection("users")
total_users = int(users_ref.count().get()[0][0].value)
# Get users for current page
users_query = users_ref.order_by("user_email").limit(per_page).offset(offset)
docs = users_query.stream()
users = []
for doc in docs:
user_data = doc.to_dict()
users.append({
"uid": doc.id,
"user_email": user_data.get("user_email", ""),
"case_email": user_data.get("case_email", ""),
"enabled": bool(user_data.get("enabled", False)),
"is_admin": bool(user_data.get("is_admin", False))
})
except Exception as e:
print(f"[ERR] Failed to fetch users: {e}")
users = []
total_users = 0
# Calculate pagination
total_pages = (total_users + per_page - 1) // per_page # Ceiling division
return render_template("admin_users.html",
users=users,
current_page=page,
total_pages=total_pages,
total_users=total_users,
per_page=per_page)
@app.route("/admin/users/<uid>")
@login_required
def admin_user_detail(uid):
"""Show user details for admin"""
uid = session.get("uid")
profile = get_user_profile(uid)
# Only admins can access this page
if not profile.get("is_admin"):
abort(403, "Access denied. Admin privileges required.")
# Get user data
user_doc = db.collection("users").document(uid).get()
if not user_doc.exists:
abort(404, "User not found")
user_data = user_doc.to_dict()
user = {
"uid": uid,
"user_email": user_data.get("user_email", ""),
"case_email": user_data.get("case_email", ""),
"enabled": bool(user_data.get("enabled", False)),
"is_admin": bool(user_data.get("is_admin", False))
}
return render_template("admin_user_edit.html", user=user)
@app.route("/admin/users/update", methods=["POST"])
@login_required
def update_user():
"""Update user information"""
uid = session.get("uid")
profile = get_user_profile(uid)
# Only admins can update users
if not profile.get("is_admin"):
abort(403, "Access denied. Admin privileges required.")
try:
data = request.get_json()
if not data:
abort(400, "Invalid JSON data")
target_uid = data.get("uid")
if not target_uid:
abort(400, "User ID is required")
# Update user in Firestore
user_ref = db.collection("users").document(target_uid)
user_ref.update({
"enabled": data.get("enabled", False),
"is_admin": data.get("is_admin", False),
"case_email": data.get("case_email", "")
})
return jsonify({"success": True})
except Exception as e:
print(f"[ERR] Failed to update user: {e}")
abort(500, "Failed to update user")
# GAE compatibility
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", "5004")))