import csv import io from typing import Annotated from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse import asyncpg from app.core.pagination import pagination_params from app.core.responses import ok, paginated from app.dependencies import get_db, require_admin from app.exceptions import NotFoundError from app.models.admin import UpdateCustomer router = APIRouter(prefix="/customers", tags=["Admin — Customers"]) @router.get("") async def list_customers( pagination: Annotated[tuple, Depends(pagination_params)], search: str | None = Query(None), db: asyncpg.Connection = Depends(get_db), _: dict = Depends(require_admin), ): page, per_page, offset = pagination conditions = ["role = 'client'"] params: list = [] if search: params.append(f"%{search}%") conditions.append(f"(email ILIKE ${len(params)} OR full_name ILIKE ${len(params)})") where = f"WHERE {' AND '.join(conditions)}" total = await db.fetchval(f"SELECT COUNT(*) FROM profiles {where}", *params) params.extend([per_page, offset]) rows = await db.fetch( f""" SELECT p.id, p.email, p.full_name, p.phone, p.is_blocked, p.created_at, COUNT(DISTINCT o.id) AS orders_count, COUNT(DISTINCT b.id) AS bookings_count, COALESCE(SUM(o.total_amount) FILTER (WHERE o.status IN ('paid','shipped','delivered')), 0) AS total_spent FROM profiles p LEFT JOIN orders o ON o.user_id = p.id LEFT JOIN bookings b ON b.user_id = p.id {where} GROUP BY p.id ORDER BY p.created_at DESC LIMIT ${len(params) - 1} OFFSET ${len(params)} """, *params, ) return paginated([dict(r) for r in rows], total, page, per_page) @router.get("/export") async def export_customers( db: asyncpg.Connection = Depends(get_db), _: dict = Depends(require_admin), ): rows = await db.fetch( "SELECT id, email, full_name, phone, is_blocked, created_at FROM profiles WHERE role = 'client' ORDER BY created_at DESC" ) output = io.StringIO() writer = csv.writer(output) writer.writerow(["ID", "Email", "Name", "Phone", "Blocked", "Joined"]) for r in rows: writer.writerow([r["id"], r["email"], r["full_name"], r["phone"], r["is_blocked"], r["created_at"]]) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=customers.csv"}, ) @router.get("/{customer_id}") async def get_customer( customer_id: str, db: asyncpg.Connection = Depends(get_db), _: dict = Depends(require_admin), ): row = await db.fetchrow( """ SELECT p.id, p.email, p.full_name, p.phone, p.is_blocked, p.created_at, COUNT(DISTINCT o.id) AS orders_count, COUNT(DISTINCT b.id) AS bookings_count, COALESCE(SUM(o.total_amount) FILTER (WHERE o.status IN ('paid','shipped','delivered')), 0) AS total_spent FROM profiles p LEFT JOIN orders o ON o.user_id = p.id LEFT JOIN bookings b ON b.user_id = p.id WHERE p.id = $1 GROUP BY p.id """, customer_id, ) if not row: raise NotFoundError("customer") orders = await db.fetch( "SELECT id, status, total_amount, created_at FROM orders WHERE user_id = $1 ORDER BY created_at DESC LIMIT 10", customer_id, ) bookings = await db.fetch( """ SELECT b.id, b.status, b.amount_paid, ts.date, ts.start_time FROM bookings b JOIN time_slots ts ON ts.id = b.slot_id WHERE b.user_id = $1 ORDER BY ts.date DESC LIMIT 10 """, customer_id, ) result = dict(row) result["recent_orders"] = [dict(r) for r in orders] result["recent_bookings"] = [dict(r) for r in bookings] return ok(result) @router.patch("/{customer_id}") async def update_customer( customer_id: str, body: UpdateCustomer, db: asyncpg.Connection = Depends(get_db), admin: dict = Depends(require_admin), ): updates = {k: v for k, v in body.model_dump().items() if v is not None} if not updates: raise NotFoundError("customer") set_clauses = [f"{k} = ${i + 2}" for i, k in enumerate(updates)] row = await db.fetchrow( f"UPDATE profiles SET {', '.join(set_clauses)}, updated_at = now() WHERE id = $1 AND role = 'client' RETURNING *", customer_id, *updates.values(), ) if not row: raise NotFoundError("customer") action = "customer.blocked" if updates.get("is_blocked") else "customer.updated" await db.execute( "INSERT INTO activity_log (actor_id, action, entity_type, entity_id) VALUES ($1, $2, 'customer', $3)", str(admin["id"]), action, customer_id, ) return ok(dict(row))