185 lines
7.0 KiB
Python
185 lines
7.0 KiB
Python
import json
|
|
import os
|
|
from functools import wraps
|
|
from flask import render_template, request, redirect, url_for, session, abort, jsonify
|
|
from firebase_init import db
|
|
from firebase_admin import auth as fb_auth
|
|
from utils import get_user_profile
|
|
|
|
def admin_required(view):
|
|
@wraps(view)
|
|
def wrapped(*args, **kwargs):
|
|
uid = session.get("uid")
|
|
if not uid:
|
|
return redirect(url_for("login"))
|
|
profile = get_user_profile(uid)
|
|
if not profile.get("is_admin"):
|
|
abort(403, "Access denied. Admin privileges required.")
|
|
return view(*args, **kwargs)
|
|
return wrapped
|
|
|
|
def register_admin_routes(app):
|
|
@app.route("/admin/users")
|
|
@app.route("/admin/users/<int:page>")
|
|
@admin_required
|
|
def admin_users(page=1):
|
|
"""Admin page to manage all users"""
|
|
# Pagination settings
|
|
per_page = 25
|
|
offset = (page - 1) * per_page
|
|
|
|
# Get all users from Firestore
|
|
try:
|
|
# Get total count of users
|
|
users_ref = db.collection("users")
|
|
total_users = int(users_ref.count().get()[0][0].value)
|
|
|
|
# Get users for current page
|
|
users_query = users_ref.order_by("user_email").limit(per_page).offset(offset)
|
|
docs = users_query.stream()
|
|
|
|
users = []
|
|
for doc in docs:
|
|
user_data = doc.to_dict()
|
|
users.append({
|
|
"uid": doc.id,
|
|
"user_email": user_data.get("user_email", ""),
|
|
"case_email": user_data.get("case_email", ""),
|
|
"enabled": bool(user_data.get("enabled", False)),
|
|
"is_admin": bool(user_data.get("is_admin", False))
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to fetch users: {e}")
|
|
users = []
|
|
total_users = 0
|
|
|
|
# Calculate pagination
|
|
total_pages = (total_users + per_page - 1) // per_page # Ceiling division
|
|
|
|
return render_template("admin_users.html",
|
|
users=users,
|
|
current_page=page,
|
|
total_pages=total_pages,
|
|
total_users=total_users,
|
|
per_page=per_page)
|
|
|
|
@app.route("/admin/users/<uid>")
|
|
@admin_required
|
|
def admin_user_detail(uid):
|
|
"""Show user details for admin"""
|
|
# Get user data
|
|
user_doc = db.collection("users").document(uid).get()
|
|
if not user_doc.exists:
|
|
abort(404, "User not found")
|
|
|
|
user_data = user_doc.to_dict()
|
|
user = {
|
|
"uid": uid,
|
|
"user_email": user_data.get("user_email", ""),
|
|
"case_email": user_data.get("case_email", ""),
|
|
"enabled": bool(user_data.get("enabled", False)),
|
|
"is_admin": bool(user_data.get("is_admin", False))
|
|
}
|
|
|
|
return render_template("admin_user_edit.html", user=user)
|
|
|
|
@app.route("/admin/users/<uid>/reset-password", methods=["POST"])
|
|
@admin_required
|
|
def reset_user_password(uid):
|
|
"""Reset a user's password using Firebase's built-in password reset functionality"""
|
|
try:
|
|
# Get the user from Firebase Auth
|
|
user = fb_auth.get_user(uid)
|
|
|
|
# Generate password reset link using Firebase Auth
|
|
password_reset_link = fb_auth.generate_password_reset_link(user.email)
|
|
|
|
# Send password reset email using Firebase's built-in template
|
|
# This will send an email to the user with a link to reset their password
|
|
# Firebase automatically handles the email template and delivery
|
|
print(f"[INFO] Password reset link generated for {user.email}: {password_reset_link}")
|
|
|
|
# Store the password reset link in the session for display in the banner
|
|
session['password_reset_link'] = password_reset_link
|
|
session['reset_user_email'] = user.email
|
|
|
|
# Redirect back to the admin users table
|
|
return redirect(url_for('admin_users'))
|
|
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to generate password reset link for {uid}: {e}")
|
|
abort(500, "Failed to generate password reset link")
|
|
|
|
@app.route("/admin/users/update", methods=["POST"])
|
|
@admin_required
|
|
def update_user():
|
|
"""Update user information"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
abort(400, "Invalid JSON data")
|
|
|
|
target_uid = data.get("uid")
|
|
if not target_uid:
|
|
abort(400, "User ID is required")
|
|
|
|
# Update user in Firestore
|
|
user_ref = db.collection("users").document(target_uid)
|
|
user_ref.update({
|
|
"enabled": data.get("enabled", False),
|
|
"is_admin": data.get("is_admin", False),
|
|
"case_email": data.get("case_email", "")
|
|
})
|
|
|
|
return jsonify({"success": True})
|
|
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to update user: {e}")
|
|
abort(500, "Failed to update user")
|
|
|
|
@app.route("/admin/users/new")
|
|
@admin_required
|
|
def admin_user_new():
|
|
"""Display form to create a new user"""
|
|
return render_template("admin_user_create.html")
|
|
|
|
@app.route("/admin/users/create", methods=["POST"])
|
|
@admin_required
|
|
def create_user():
|
|
"""Create a new user"""
|
|
try:
|
|
# Get form data
|
|
user_email = request.form.get("user_email")
|
|
if not user_email:
|
|
abort(400, "User email is required")
|
|
|
|
# Validate email format
|
|
if "@" not in user_email:
|
|
abort(400, "Invalid email format")
|
|
|
|
# Create user in Firebase Authentication
|
|
user_record = fb_auth.create_user(
|
|
email=user_email,
|
|
email_verified=False,
|
|
disabled=not request.form.get("enabled", False)
|
|
)
|
|
|
|
# Create user profile in Firestore
|
|
user_ref = db.collection("users").document(user_record.uid)
|
|
user_ref.set({
|
|
"user_email": user_email,
|
|
"case_email": request.form.get("case_email", ""),
|
|
"enabled": bool(request.form.get("enabled", False)),
|
|
"is_admin": bool(request.form.get("is_admin", False))
|
|
})
|
|
|
|
# Redirect to admin users page
|
|
return redirect(url_for("admin_users"))
|
|
|
|
except fb_auth.EmailAlreadyExistsError:
|
|
print(f"[ERR] User with email {user_email} already exists")
|
|
abort(400, "A user with this email already exists")
|
|
except Exception as e:
|
|
print(f"[ERR] Failed to create user: {e}")
|
|
abort(500, "Failed to create user") |