Files
deals24togo_be/app/services/listing_service.py
belviskhoremk c4d836a0f9 Initial commit
2026-03-06 22:57:58 +00:00

261 lines
9.8 KiB
Python

"""Listing CRUD + search service."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
from app.core.exceptions import ForbiddenException, NotFoundException
from app.core.supabase import get_supabase_admin
class ListingService:
def __init__(self):
self.db = get_supabase_admin()
# ── Single ───────────────────────────────────────────
def get_listing(self, listing_id: str) -> dict:
result = (
self.db.table("listings")
.select("*, agencies(name), categories(name)")
.eq("id", listing_id)
.execute()
)
if not result.data:
raise NotFoundException("Listing not found")
return self._flatten(result.data[0])
def increment_views(self, listing_id: str) -> None:
"""Atomically increment views_count via a read-free update expression.
Runs as a background task so the API response is not delayed."""
try:
# Fetch current count and increment — best-effort, race is acceptable for a counter
result = self.db.table("listings").select("views_count").eq("id", listing_id).execute()
if result.data:
new_count = (result.data[0].get("views_count") or 0) + 1
self.db.table("listings").update({"views_count": new_count}).eq("id", listing_id).execute()
except Exception:
pass
# ── List / Search ────────────────────────────────────
def list_listings(
self,
search: Optional[str] = None,
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
location: Optional[str] = None,
listing_type: Optional[str] = None,
condition: Optional[str] = None,
status: Optional[str] = "approved",
sort_by: str = "newest",
page: int = 1,
page_size: int = 20,
agency_id: Optional[str] = None,
) -> dict:
query = self.db.table("listings").select(
"*, agencies(name), categories(name, slug)", count="exact"
)
# ── Filters ──
if status:
query = query.eq("status", status)
if agency_id:
query = query.eq("agency_id", agency_id)
if category:
# category might be a slug — resolve from categories table
cat_result = (
self.db.table("categories")
.select("id")
.eq("slug", category)
.execute()
)
if cat_result.data:
query = query.eq("category_id", cat_result.data[0]["id"])
else:
# Try direct ID match
query = query.eq("category_id", category)
if min_price is not None:
query = query.gte("price", min_price)
if max_price is not None:
query = query.lte("price", max_price)
if location:
query = query.ilike("location", f"%{location}%")
if listing_type:
query = query.eq("listing_type", listing_type)
if condition:
query = query.eq("condition", condition)
if search:
query = query.or_(
f"title.ilike.%{search}%,description.ilike.%{search}%"
)
# ── Sort ──
sort_map = {
"newest": ("created_at", True),
"oldest": ("created_at", False),
"price_asc": ("price", False),
"price_desc": ("price", True),
"popular": ("views_count", True),
}
col, desc = sort_map.get(sort_by, ("created_at", True))
query = query.order(col, desc=desc)
# ── Pagination ──
offset = (page - 1) * page_size
result = query.range(offset, offset + page_size - 1).execute()
listings = [self._flatten(l) for l in result.data]
return {
"listings": listings,
"total": result.count or 0,
"page": page,
"page_size": page_size,
}
# ── Create ───────────────────────────────────────────
def create_listing(self, agency_id: str, data: dict) -> dict:
# Subscription guard: agency must have an active subscription
now_iso = datetime.now(timezone.utc).isoformat()
sub_result = (
self.db.table("subscriptions")
.select("id")
.eq("agency_id", agency_id)
.eq("status", "active")
.gt("ends_at", now_iso)
.limit(1)
.execute()
)
if not sub_result.data:
raise ForbiddenException("Active subscription required to post listings")
now = now_iso
listing_data = {
"agency_id": agency_id,
**data,
"status": "pending",
"views_count": 0,
"created_at": now,
"updated_at": now,
}
result = self.db.table("listings").insert(listing_data).execute()
if not result.data:
raise Exception("Failed to create listing")
return result.data[0]
# ── Update ───────────────────────────────────────────
def update_listing(
self, listing_id: str, user_id: str, user_role: str, data: dict
) -> dict:
listing = self._get_raw(listing_id)
# Check ownership
if user_role != "admin":
agency = (
self.db.table("agencies")
.select("id, user_id")
.eq("id", listing["agency_id"])
.execute()
)
if not agency.data or agency.data[0]["user_id"] != user_id:
raise ForbiddenException("Not authorized to update this listing")
update_data = {k: v for k, v in data.items() if v is not None}
if not update_data:
return listing
# Reset to pending if agency edits an approved/rejected listing
if user_role != "admin" and listing["status"] in ("approved", "rejected"):
update_data["status"] = "pending"
update_data["updated_at"] = datetime.now(timezone.utc).isoformat()
result = (
self.db.table("listings")
.update(update_data)
.eq("id", listing_id)
.execute()
)
if not result.data:
raise NotFoundException("Listing not found")
return result.data[0]
# ── Status (admin) ───────────────────────────────────
def update_status(
self, listing_id: str, status: str, rejection_reason: Optional[str] = None
) -> dict:
update_data: dict = {
"status": status,
"updated_at": datetime.now(timezone.utc).isoformat(),
}
if status == "rejected" and rejection_reason:
update_data["rejection_reason"] = rejection_reason
elif status == "approved":
update_data["rejection_reason"] = None
result = (
self.db.table("listings")
.update(update_data)
.eq("id", listing_id)
.execute()
)
if not result.data:
raise NotFoundException("Listing not found")
return result.data[0]
# ── Delete ───────────────────────────────────────────
def delete_listing(self, listing_id: str, user_id: str, user_role: str) -> dict:
listing = self._get_raw(listing_id)
if user_role != "admin":
agency = (
self.db.table("agencies")
.select("id, user_id")
.eq("id", listing["agency_id"])
.execute()
)
if not agency.data or agency.data[0]["user_id"] != user_id:
raise ForbiddenException("Not authorized to delete this listing")
self.db.table("listings").delete().eq("id", listing_id).execute()
return {"message": "Listing deleted"}
# ── Stats ────────────────────────────────────────────
def get_stats(self, agency_id: Optional[str] = None) -> dict:
query = self.db.table("listings").select("status")
if agency_id:
query = query.eq("agency_id", agency_id)
result = query.execute()
statuses: dict[str, int] = {"pending": 0, "approved": 0, "rejected": 0, "sold": 0}
for row in result.data:
s = row.get("status")
if s in statuses:
statuses[s] += 1
total = sum(statuses.values())
return {"total": total, **statuses}
# ── Helpers ──────────────────────────────────────────
def _get_raw(self, listing_id: str) -> dict:
result = self.db.table("listings").select("*").eq("id", listing_id).execute()
if not result.data:
raise NotFoundException("Listing not found")
return result.data[0]
@staticmethod
def _flatten(listing: dict) -> dict:
"""Flatten joined agency/category names."""
agencies = listing.pop("agencies", None)
categories = listing.pop("categories", None)
if agencies and isinstance(agencies, dict):
listing["agency_name"] = agencies.get("name")
if categories and isinstance(categories, dict):
listing["category_name"] = categories.get("name")
return listing