284 lines
12 KiB
Python
284 lines
12 KiB
Python
import json
|
|
import os
|
|
import random
|
|
import string
|
|
from functools import wraps
|
|
from flask import render_template, request, redirect, url_for, session, abort, jsonify, flash
|
|
from firebase_init import db
|
|
from firebase_admin import auth as fb_auth
|
|
from utils import get_user_profile
|
|
|
|
def admin_required(view):
|
|
@wraps(view)
|
|
def wrapped(*args, **kwargs):
|
|
uid = session.get("uid")
|
|
if not uid:
|
|
return redirect(url_for("login"))
|
|
profile = get_user_profile(uid)
|
|
if not profile.get("is_admin"):
|
|
abort(403, "Access denied. Admin privileges required.")
|
|
return view(*args, **kwargs)
|
|
return wrapped
|
|
|
|
def register_admin_routes(app):
|
|
@app.route("/admin/users")
|
|
@app.route("/admin/users/<int:page>")
|
|
@admin_required
|
|
def admin_users(page=1):
|
|
"""Admin page to manage all users"""
|
|
# Pagination settings
|
|
per_page = int(request.args.get('per_page', 25))
|
|
offset = (page - 1) * per_page
|
|
|
|
# Get all users from Firestore
|
|
try:
|
|
# Get total count of users
|
|
users_ref = db.collection("users")
|
|
total_users = int(users_ref.count().get()[0][0].value)
|
|
|
|
# Get users for current page
|
|
users_query = users_ref.order_by("user_email").limit(per_page).offset(offset)
|
|
docs = users_query.stream()
|
|
|
|
users = []
|
|
for doc in docs:
|
|
user_data = doc.to_dict()
|
|
users.append({
|
|
"uid": doc.id,
|
|
"user_email": user_data.get("user_email", ""),
|
|
"case_email": user_data.get("case_email", ""),
|
|
"case_domain_email": user_data.get("case_domain_email", ""),
|
|
"enabled": bool(user_data.get("enabled", False)),
|
|
"is_admin": bool(user_data.get("is_admin", False)),
|
|
"password_reset_required": bool(user_data.get("password_reset_required", False))
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to fetch users: {e}")
|
|
users = []
|
|
total_users = 0
|
|
|
|
# Calculate pagination
|
|
total_pages = (total_users + per_page - 1) // per_page # Ceiling division
|
|
|
|
return render_template("admin_users.html",
|
|
users=users,
|
|
current_page=page,
|
|
total_pages=total_pages,
|
|
total_users=total_users,
|
|
per_page=per_page)
|
|
|
|
@app.route("/admin/users/<uid>")
|
|
@admin_required
|
|
def admin_user_detail(uid):
|
|
"""Show user details for admin"""
|
|
# Get user data
|
|
user_doc = db.collection("users").document(uid).get()
|
|
if not user_doc.exists:
|
|
abort(404, "User not found")
|
|
|
|
user_data = user_doc.to_dict()
|
|
user = {
|
|
"uid": uid,
|
|
"user_email": user_data.get("user_email", ""),
|
|
"case_email": user_data.get("case_email", ""),
|
|
"case_domain_email": user_data.get("case_domain_email", ""),
|
|
"enabled": bool(user_data.get("enabled", False)),
|
|
"is_admin": bool(user_data.get("is_admin", False))
|
|
}
|
|
|
|
return render_template("admin_user_edit.html", user=user)
|
|
|
|
@app.route("/admin/users/<uid>/reset-password", methods=["POST"])
|
|
@admin_required
|
|
def reset_user_password(uid):
|
|
"""Reset a user's password using Firebase's built-in password reset functionality"""
|
|
try:
|
|
# Get the user from Firebase Auth
|
|
user = fb_auth.get_user(uid)
|
|
|
|
|
|
# Generate temporary password (random word + 3 digits)
|
|
words = ["sun", "moon", "star", "cloud", "rain", "wind", "fire", "water", "snow", "stone",
|
|
"tree", "leaf", "flower", "bird", "wolf", "tiger", "bear", "fish", "dragon",
|
|
"magic", "quest", "light", "dark", "gold", "silver", "ruby", "pearl", "diamond"]
|
|
random_word = random.choice(words)
|
|
random_digits = ''.join(random.choices(string.digits, k=3))
|
|
temp_password = f"{random_word}{random_digits}"
|
|
|
|
# Create user profile in Firestore with password reset required flag
|
|
user_ref = db.collection("users").document(user.uid)
|
|
user_ref.set({
|
|
"password_reset_required": True
|
|
},merge=True)
|
|
|
|
flash(f"User now has a temporary password {temp_password}.", "success")
|
|
fb_auth.update_user(uid, password=temp_password)
|
|
|
|
# Redirect back to the admin users table
|
|
return redirect(url_for('admin_users'))
|
|
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to generate password reset link for {uid}: {e}")
|
|
abort(500, "Failed to generate password reset link")
|
|
|
|
@app.route("/admin/users/update", methods=["POST"])
|
|
@admin_required
|
|
def update_user():
|
|
"""Update user information"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
abort(400, "Invalid JSON data")
|
|
|
|
target_uid = data.get("uid")
|
|
if not target_uid:
|
|
abort(400, "User ID is required")
|
|
|
|
# Update user in Firestore
|
|
user_ref = db.collection("users").document(target_uid)
|
|
# Only update fields that can be changed, excluding is_admin
|
|
update_data = {
|
|
"enabled": data.get("enabled", False),
|
|
"case_email": data.get("case_email", ""),
|
|
"case_domain_email": data.get("case_domain_email", "")
|
|
}
|
|
# Never allow changing is_admin field during updates - admin status can only be set during creation
|
|
user_ref.update(update_data)
|
|
|
|
return jsonify({"success": True})
|
|
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to update user: {e}")
|
|
abort(500, "Failed to update user")
|
|
|
|
@app.route("/admin/users/new")
|
|
@admin_required
|
|
def admin_user_new():
|
|
"""Display form to create a new user"""
|
|
return render_template("admin_user_create.html")
|
|
|
|
@app.route("/admin/users/create", methods=["POST"])
|
|
@admin_required
|
|
def create_user():
|
|
"""Create a new user with temporary password"""
|
|
try:
|
|
# Get form data
|
|
user_email = request.form.get("user_email")
|
|
if not user_email:
|
|
abort(400, "User email is required")
|
|
|
|
# Validate email format
|
|
if "@" not in user_email:
|
|
abort(400, "Invalid email format")
|
|
|
|
# Generate temporary password (random word + 3 digits)
|
|
words = ["sun", "moon", "star", "cloud", "rain", "wind", "fire", "water", "snow", "stone",
|
|
"tree", "leaf", "flower", "bird", "wolf", "tiger", "bear", "fish", "dragon",
|
|
"magic", "quest", "light", "dark", "gold", "silver", "ruby", "pearl", "diamond"]
|
|
random_word = random.choice(words)
|
|
random_digits = ''.join(random.choices(string.digits, k=3))
|
|
temp_password = f"{random_word}{random_digits}"
|
|
|
|
# Create user in Firebase Authentication with temporary password
|
|
user_record = fb_auth.create_user(
|
|
email=user_email,
|
|
email_verified=False,
|
|
disabled=not request.form.get("enabled", False),
|
|
password=temp_password
|
|
)
|
|
|
|
# Create user profile in Firestore with password reset required flag
|
|
user_ref = db.collection("users").document(user_record.uid)
|
|
user_ref.set({
|
|
"user_email": user_email,
|
|
"case_email": request.form.get("case_email", ""),
|
|
"case_domain_email": request.form.get("case_domain_email", ""),
|
|
"enabled": bool(request.form.get("enabled", False)),
|
|
"is_admin": bool(request.form.get("is_admin", False)),
|
|
"password_reset_required": True
|
|
})
|
|
|
|
# Display success message with temporary password
|
|
flash(f"User created successfully. Temporary password: {temp_password}", "success")
|
|
print(f"[INFO] Created user {user_email} with temp password: {temp_password}")
|
|
|
|
# Redirect to admin users page
|
|
return redirect(url_for("admin_users"))
|
|
|
|
except fb_auth.EmailAlreadyExistsError:
|
|
print(f"[ERR] User with email {user_email} already exists")
|
|
abort(400, "A user with this email already exists")
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to create user: {e}")
|
|
abort(500, "Failed to create user")
|
|
|
|
@app.route("/admin/users/<uid>/become-user", methods=["POST"])
|
|
@admin_required
|
|
def become_user(uid):
|
|
"""Allow admin to impersonate another user by replacing session UID"""
|
|
try:
|
|
# Verify the target user exists
|
|
target_user_doc = db.collection("users").document(uid).get()
|
|
if not target_user_doc.exists:
|
|
abort(404, "User not found")
|
|
|
|
# Store original admin UID and set impersonation flags
|
|
session['original_uid'] = session.get('uid')
|
|
session['impersonating'] = True
|
|
|
|
# Replace session UID with target user's UID
|
|
session['uid'] = uid
|
|
|
|
print(f"[INFO] Admin {session['original_uid']} is now impersonating user {uid}")
|
|
|
|
# Redirect to dashboard
|
|
return redirect(url_for('dashboard'))
|
|
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to become user {uid}: {e}")
|
|
abort(500, "Failed to impersonate user")
|
|
|
|
@app.route("/admin/users/revert")
|
|
def revert_user():
|
|
"""Revert back to the original admin session.
|
|
|
|
Only accessible if the original session owner (before impersonation) was an admin.
|
|
This check must happen before @admin_required decorator to verify the original UID,
|
|
not the currently impersonated user's UID.
|
|
"""
|
|
try:
|
|
# Check if user is currently impersonating
|
|
if 'original_uid' not in session:
|
|
# Not impersonating - check if current user is admin
|
|
uid = session.get('uid')
|
|
if not uid:
|
|
return redirect(url_for('login'))
|
|
profile = get_user_profile(uid)
|
|
if profile.get('is_admin'):
|
|
return redirect(url_for('admin_users'))
|
|
# Not an admin, deny access
|
|
abort(403, "Access denied. Admin privileges required.")
|
|
|
|
# Verify the original session owner was an admin (not the impersonated user)
|
|
original_uid = session.get('original_uid')
|
|
if not original_uid:
|
|
abort(403, "Access denied. Invalid session state.")
|
|
|
|
original_profile = get_user_profile(original_uid)
|
|
if not original_profile.get('is_admin'):
|
|
abort(403, "Access denied. Only admins can revert from impersonation.")
|
|
|
|
# Restore original admin UID
|
|
session['uid'] = original_uid
|
|
session.pop('impersonating', None)
|
|
session.pop('original_uid', None)
|
|
|
|
print(f"[INFO] Reverted from impersonation back to admin")
|
|
|
|
# Redirect to admin users page
|
|
return redirect(url_for('admin_users'))
|
|
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to revert user: {e}")
|
|
abort(500, "Failed to revert session") |