mirror of
http://88.130.71.182:3000/BlitTech/deals24togo_be.git
synced 2026-06-12 23:33:21 +00:00
109 lines
3.8 KiB
Python
109 lines
3.8 KiB
Python
"""Category CRUD service."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from app.core.exceptions import ConflictException, NotFoundException
|
|
from app.core.supabase import get_supabase_admin
|
|
|
|
|
|
class CategoryService:
|
|
def __init__(self):
|
|
self.db = get_supabase_admin()
|
|
|
|
def get_category(self, category_id: str) -> dict:
|
|
result = self.db.table("categories").select("*").eq("id", category_id).execute()
|
|
if not result.data:
|
|
raise NotFoundException("Category not found")
|
|
return result.data[0]
|
|
|
|
def get_category_by_slug(self, slug: str) -> dict:
|
|
result = self.db.table("categories").select("*").eq("slug", slug).execute()
|
|
if not result.data:
|
|
raise NotFoundException("Category not found")
|
|
return result.data[0]
|
|
|
|
def list_categories(self) -> dict:
|
|
result = (
|
|
self.db.table("categories")
|
|
.select("*", count="exact")
|
|
.order("name")
|
|
.execute()
|
|
)
|
|
# Count listings per category in one extra query
|
|
listings_result = self.db.table("listings").select("category_id").execute()
|
|
counts: dict[str, int] = {}
|
|
for row in listings_result.data or []:
|
|
cid = row.get("category_id")
|
|
if cid:
|
|
counts[cid] = counts.get(cid, 0) + 1
|
|
categories = result.data or []
|
|
for cat in categories:
|
|
cat["listing_count"] = counts.get(cat["id"], 0)
|
|
return {"categories": categories, "total": result.count or 0}
|
|
|
|
def create_category(self, data: dict) -> dict:
|
|
# Check slug uniqueness
|
|
existing = (
|
|
self.db.table("categories")
|
|
.select("id")
|
|
.eq("slug", data["slug"])
|
|
.execute()
|
|
)
|
|
if existing.data:
|
|
raise ConflictException("A category with this slug already exists")
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
cat_data = {**data, "created_at": now, "updated_at": now}
|
|
result = self.db.table("categories").insert(cat_data).execute()
|
|
if not result.data:
|
|
raise Exception("Failed to create category")
|
|
return result.data[0]
|
|
|
|
def update_category(self, category_id: str, data: dict) -> dict:
|
|
update_data = {k: v for k, v in data.items() if v is not None}
|
|
if not update_data:
|
|
return self.get_category(category_id)
|
|
|
|
# Check slug uniqueness if changing
|
|
if "slug" in update_data:
|
|
existing = (
|
|
self.db.table("categories")
|
|
.select("id")
|
|
.eq("slug", update_data["slug"])
|
|
.neq("id", category_id)
|
|
.execute()
|
|
)
|
|
if existing.data:
|
|
raise ConflictException("A category with this slug already exists")
|
|
|
|
update_data["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
result = (
|
|
self.db.table("categories")
|
|
.update(update_data)
|
|
.eq("id", category_id)
|
|
.execute()
|
|
)
|
|
if not result.data:
|
|
raise NotFoundException("Category not found")
|
|
return result.data[0]
|
|
|
|
def delete_category(self, category_id: str) -> dict:
|
|
# Check if any listings reference this category
|
|
listings = (
|
|
self.db.table("listings")
|
|
.select("id", count="exact")
|
|
.eq("category_id", category_id)
|
|
.execute()
|
|
)
|
|
if listings.data:
|
|
raise ConflictException(
|
|
f"Cannot delete category: {listings.count} listings reference it"
|
|
)
|
|
|
|
result = self.db.table("categories").delete().eq("id", category_id).execute()
|
|
if not result.data:
|
|
raise NotFoundException("Category not found")
|
|
return {"message": "Category deleted"}
|