"""Listing CRUD + search service.""" from __future__ import annotations from datetime import datetime, timezone from typing import Optional from app.core.exceptions import ForbiddenException, NotFoundException from app.core.supabase import get_supabase_admin class ListingService: def __init__(self): self.db = get_supabase_admin() # ── Single ─────────────────────────────────────────── def get_listing(self, listing_id: str) -> dict: result = ( self.db.table("listings") .select("*, agencies(name), categories(name)") .eq("id", listing_id) .execute() ) if not result.data: raise NotFoundException("Listing not found") return self._flatten(result.data[0]) def increment_views(self, listing_id: str) -> None: """Atomically increment views_count via a read-free update expression. Runs as a background task so the API response is not delayed.""" try: # Fetch current count and increment — best-effort, race is acceptable for a counter result = self.db.table("listings").select("views_count").eq("id", listing_id).execute() if result.data: new_count = (result.data[0].get("views_count") or 0) + 1 self.db.table("listings").update({"views_count": new_count}).eq("id", listing_id).execute() except Exception: pass # ── List / Search ──────────────────────────────────── def list_listings( self, search: Optional[str] = None, category: Optional[str] = None, min_price: Optional[float] = None, max_price: Optional[float] = None, location: Optional[str] = None, listing_type: Optional[str] = None, condition: Optional[str] = None, status: Optional[str] = "approved", sort_by: str = "newest", page: int = 1, page_size: int = 20, agency_id: Optional[str] = None, ) -> dict: query = self.db.table("listings").select( "*, agencies(name), categories(name, slug)", count="exact" ) # ── Filters ── if status: query = query.eq("status", status) if agency_id: query = query.eq("agency_id", agency_id) if category: # category might be a slug — resolve from categories table cat_result = ( self.db.table("categories") .select("id") .eq("slug", category) .execute() ) if cat_result.data: query = query.eq("category_id", cat_result.data[0]["id"]) else: # Try direct ID match query = query.eq("category_id", category) if min_price is not None: query = query.gte("price", min_price) if max_price is not None: query = query.lte("price", max_price) if location: query = query.ilike("location", f"%{location}%") if listing_type: query = query.eq("listing_type", listing_type) if condition: query = query.eq("condition", condition) if search: query = query.or_( f"title.ilike.%{search}%,description.ilike.%{search}%" ) # ── Sort ── sort_map = { "newest": ("created_at", True), "oldest": ("created_at", False), "price_asc": ("price", False), "price_desc": ("price", True), "popular": ("views_count", True), } col, desc = sort_map.get(sort_by, ("created_at", True)) query = query.order(col, desc=desc) # ── Pagination ── offset = (page - 1) * page_size result = query.range(offset, offset + page_size - 1).execute() listings = [self._flatten(l) for l in result.data] return { "listings": listings, "total": result.count or 0, "page": page, "page_size": page_size, } # ── Create ─────────────────────────────────────────── def create_listing(self, agency_id: str, data: dict) -> dict: # Subscription guard: agency must have an active subscription now_iso = datetime.now(timezone.utc).isoformat() sub_result = ( self.db.table("subscriptions") .select("id") .eq("agency_id", agency_id) .eq("status", "active") .gt("ends_at", now_iso) .limit(1) .execute() ) if not sub_result.data: raise ForbiddenException("Active subscription required to post listings") now = now_iso listing_data = { "agency_id": agency_id, **data, "status": "pending", "views_count": 0, "created_at": now, "updated_at": now, } result = self.db.table("listings").insert(listing_data).execute() if not result.data: raise Exception("Failed to create listing") return result.data[0] # ── Update ─────────────────────────────────────────── def update_listing( self, listing_id: str, user_id: str, user_role: str, data: dict ) -> dict: listing = self._get_raw(listing_id) # Check ownership if user_role != "admin": agency = ( self.db.table("agencies") .select("id, user_id") .eq("id", listing["agency_id"]) .execute() ) if not agency.data or agency.data[0]["user_id"] != user_id: raise ForbiddenException("Not authorized to update this listing") update_data = {k: v for k, v in data.items() if v is not None} if not update_data: return listing # Reset to pending if agency edits an approved/rejected listing if user_role != "admin" and listing["status"] in ("approved", "rejected"): update_data["status"] = "pending" update_data["updated_at"] = datetime.now(timezone.utc).isoformat() result = ( self.db.table("listings") .update(update_data) .eq("id", listing_id) .execute() ) if not result.data: raise NotFoundException("Listing not found") return result.data[0] # ── Status (admin) ─────────────────────────────────── def update_status( self, listing_id: str, status: str, rejection_reason: Optional[str] = None ) -> dict: update_data: dict = { "status": status, "updated_at": datetime.now(timezone.utc).isoformat(), } if status == "rejected" and rejection_reason: update_data["rejection_reason"] = rejection_reason elif status == "approved": update_data["rejection_reason"] = None result = ( self.db.table("listings") .update(update_data) .eq("id", listing_id) .execute() ) if not result.data: raise NotFoundException("Listing not found") return result.data[0] # ── Delete ─────────────────────────────────────────── def delete_listing(self, listing_id: str, user_id: str, user_role: str) -> dict: listing = self._get_raw(listing_id) if user_role != "admin": agency = ( self.db.table("agencies") .select("id, user_id") .eq("id", listing["agency_id"]) .execute() ) if not agency.data or agency.data[0]["user_id"] != user_id: raise ForbiddenException("Not authorized to delete this listing") self.db.table("listings").delete().eq("id", listing_id).execute() return {"message": "Listing deleted"} # ── Stats ──────────────────────────────────────────── def get_stats(self, agency_id: Optional[str] = None) -> dict: query = self.db.table("listings").select("status") if agency_id: query = query.eq("agency_id", agency_id) result = query.execute() statuses: dict[str, int] = {"pending": 0, "approved": 0, "rejected": 0, "sold": 0} for row in result.data: s = row.get("status") if s in statuses: statuses[s] += 1 total = sum(statuses.values()) return {"total": total, **statuses} # ── Helpers ────────────────────────────────────────── def _get_raw(self, listing_id: str) -> dict: result = self.db.table("listings").select("*").eq("id", listing_id).execute() if not result.data: raise NotFoundException("Listing not found") return result.data[0] @staticmethod def _flatten(listing: dict) -> dict: """Flatten joined agency/category names.""" agencies = listing.pop("agencies", None) categories = listing.pop("categories", None) if agencies and isinstance(agencies, dict): listing["agency_name"] = agencies.get("name") if categories and isinstance(categories, dict): listing["category_name"] = categories.get("name") return listing