Files
rothbard/admin.py
2026-04-01 13:51:25 -07:00

284 lines
12 KiB
Python

import json
import os
import random
import string
from functools import wraps
from flask import render_template, request, redirect, url_for, session, abort, jsonify, flash
from firebase_init import db
from firebase_admin import auth as fb_auth
from utils import get_user_profile
def admin_required(view):
@wraps(view)
def wrapped(*args, **kwargs):
uid = session.get("uid")
if not uid:
return redirect(url_for("login"))
profile = get_user_profile(uid)
if not profile.get("is_admin"):
abort(403, "Access denied. Admin privileges required.")
return view(*args, **kwargs)
return wrapped
def register_admin_routes(app):
@app.route("/admin/users")
@app.route("/admin/users/<int:page>")
@admin_required
def admin_users(page=1):
"""Admin page to manage all users"""
# Pagination settings
per_page = int(request.args.get('per_page', 25))
offset = (page - 1) * per_page
# Get all users from Firestore
try:
# Get total count of users
users_ref = db.collection("users")
total_users = int(users_ref.count().get()[0][0].value)
# Get users for current page
users_query = users_ref.order_by("user_email").limit(per_page).offset(offset)
docs = users_query.stream()
users = []
for doc in docs:
user_data = doc.to_dict()
users.append({
"uid": doc.id,
"user_email": user_data.get("user_email", ""),
"case_email": user_data.get("case_email", ""),
"case_domain_email": user_data.get("case_domain_email", ""),
"enabled": bool(user_data.get("enabled", False)),
"is_admin": bool(user_data.get("is_admin", False)),
"password_reset_required": bool(user_data.get("password_reset_required", False))
})
except Exception as e:
print(f"[ERR] Failed to fetch users: {e}")
users = []
total_users = 0
# Calculate pagination
total_pages = (total_users + per_page - 1) // per_page # Ceiling division
return render_template("admin_users.html",
users=users,
current_page=page,
total_pages=total_pages,
total_users=total_users,
per_page=per_page)
@app.route("/admin/users/<uid>")
@admin_required
def admin_user_detail(uid):
"""Show user details for admin"""
# Get user data
user_doc = db.collection("users").document(uid).get()
if not user_doc.exists:
abort(404, "User not found")
user_data = user_doc.to_dict()
user = {
"uid": uid,
"user_email": user_data.get("user_email", ""),
"case_email": user_data.get("case_email", ""),
"case_domain_email": user_data.get("case_domain_email", ""),
"enabled": bool(user_data.get("enabled", False)),
"is_admin": bool(user_data.get("is_admin", False))
}
return render_template("admin_user_edit.html", user=user)
@app.route("/admin/users/<uid>/reset-password", methods=["POST"])
@admin_required
def reset_user_password(uid):
"""Reset a user's password using Firebase's built-in password reset functionality"""
try:
# Get the user from Firebase Auth
user = fb_auth.get_user(uid)
# Generate temporary password (random word + 3 digits)
words = ["sun", "moon", "star", "cloud", "rain", "wind", "fire", "water", "snow", "stone",
"tree", "leaf", "flower", "bird", "wolf", "tiger", "bear", "fish", "dragon",
"magic", "quest", "light", "dark", "gold", "silver", "ruby", "pearl", "diamond"]
random_word = random.choice(words)
random_digits = ''.join(random.choices(string.digits, k=3))
temp_password = f"{random_word}{random_digits}"
# Create user profile in Firestore with password reset required flag
user_ref = db.collection("users").document(user.uid)
user_ref.set({
"password_reset_required": True
},merge=True)
flash(f"User now has a temporary password {temp_password}.", "success")
fb_auth.update_user(uid, password=temp_password)
# Redirect back to the admin users table
return redirect(url_for('admin_users'))
except Exception as e:
print(f"[ERR] Failed to generate password reset link for {uid}: {e}")
abort(500, "Failed to generate password reset link")
@app.route("/admin/users/update", methods=["POST"])
@admin_required
def update_user():
"""Update user information"""
try:
data = request.get_json()
if not data:
abort(400, "Invalid JSON data")
target_uid = data.get("uid")
if not target_uid:
abort(400, "User ID is required")
# Update user in Firestore
user_ref = db.collection("users").document(target_uid)
# Only update fields that can be changed, excluding is_admin
update_data = {
"enabled": data.get("enabled", False),
"case_email": data.get("case_email", ""),
"case_domain_email": data.get("case_domain_email", "")
}
# Never allow changing is_admin field during updates - admin status can only be set during creation
user_ref.update(update_data)
return jsonify({"success": True})
except Exception as e:
print(f"[ERR] Failed to update user: {e}")
abort(500, "Failed to update user")
@app.route("/admin/users/new")
@admin_required
def admin_user_new():
"""Display form to create a new user"""
return render_template("admin_user_create.html")
@app.route("/admin/users/create", methods=["POST"])
@admin_required
def create_user():
"""Create a new user with temporary password"""
try:
# Get form data
user_email = request.form.get("user_email")
if not user_email:
abort(400, "User email is required")
# Validate email format
if "@" not in user_email:
abort(400, "Invalid email format")
# Generate temporary password (random word + 3 digits)
words = ["sun", "moon", "star", "cloud", "rain", "wind", "fire", "water", "snow", "stone",
"tree", "leaf", "flower", "bird", "wolf", "tiger", "bear", "fish", "dragon",
"magic", "quest", "light", "dark", "gold", "silver", "ruby", "pearl", "diamond"]
random_word = random.choice(words)
random_digits = ''.join(random.choices(string.digits, k=3))
temp_password = f"{random_word}{random_digits}"
# Create user in Firebase Authentication with temporary password
user_record = fb_auth.create_user(
email=user_email,
email_verified=False,
disabled=not request.form.get("enabled", False),
password=temp_password
)
# Create user profile in Firestore with password reset required flag
user_ref = db.collection("users").document(user_record.uid)
user_ref.set({
"user_email": user_email,
"case_email": request.form.get("case_email", ""),
"case_domain_email": request.form.get("case_domain_email", ""),
"enabled": bool(request.form.get("enabled", False)),
"is_admin": bool(request.form.get("is_admin", False)),
"password_reset_required": True
})
# Display success message with temporary password
flash(f"User created successfully. Temporary password: {temp_password}", "success")
print(f"[INFO] Created user {user_email} with temp password: {temp_password}")
# Redirect to admin users page
return redirect(url_for("admin_users"))
except fb_auth.EmailAlreadyExistsError:
print(f"[ERR] User with email {user_email} already exists")
abort(400, "A user with this email already exists")
except Exception as e:
print(f"[ERR] Failed to create user: {e}")
abort(500, "Failed to create user")
@app.route("/admin/users/<uid>/become-user", methods=["POST"])
@admin_required
def become_user(uid):
"""Allow admin to impersonate another user by replacing session UID"""
try:
# Verify the target user exists
target_user_doc = db.collection("users").document(uid).get()
if not target_user_doc.exists:
abort(404, "User not found")
# Store original admin UID and set impersonation flags
session['original_uid'] = session.get('uid')
session['impersonating'] = True
# Replace session UID with target user's UID
session['uid'] = uid
print(f"[INFO] Admin {session['original_uid']} is now impersonating user {uid}")
# Redirect to dashboard
return redirect(url_for('dashboard'))
except Exception as e:
print(f"[ERR] Failed to become user {uid}: {e}")
abort(500, "Failed to impersonate user")
@app.route("/admin/users/revert")
def revert_user():
"""Revert back to the original admin session.
Only accessible if the original session owner (before impersonation) was an admin.
This check must happen before @admin_required decorator to verify the original UID,
not the currently impersonated user's UID.
"""
try:
# Check if user is currently impersonating
if 'original_uid' not in session:
# Not impersonating - check if current user is admin
uid = session.get('uid')
if not uid:
return redirect(url_for('login'))
profile = get_user_profile(uid)
if profile.get('is_admin'):
return redirect(url_for('admin_users'))
# Not an admin, deny access
abort(403, "Access denied. Admin privileges required.")
# Verify the original session owner was an admin (not the impersonated user)
original_uid = session.get('original_uid')
if not original_uid:
abort(403, "Access denied. Invalid session state.")
original_profile = get_user_profile(original_uid)
if not original_profile.get('is_admin'):
abort(403, "Access denied. Only admins can revert from impersonation.")
# Restore original admin UID
session['uid'] = original_uid
session.pop('impersonating', None)
session.pop('original_uid', None)
print(f"[INFO] Reverted from impersonation back to admin")
# Redirect to admin users page
return redirect(url_for('admin_users'))
except Exception as e:
print(f"[ERR] Failed to revert user: {e}")
abort(500, "Failed to revert session")