mirror of
http://88.130.71.182:3000/BlitTech/deals24togo_be.git
synced 2026-06-12 23:33:21 +00:00
351 lines
12 KiB
Python
351 lines
12 KiB
Python
"""Payment service — CinetPay integration for subscriptions and purchases."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.exceptions import BadRequestException, ForbiddenException, NotFoundException
|
|
from app.core.supabase import get_supabase_admin
|
|
|
|
CINETPAY_INIT_URL = "https://api-checkout.cinetpay.com/v2/payment"
|
|
CINETPAY_CHECK_URL = "https://api-checkout.cinetpay.com/v2/payment/check"
|
|
|
|
|
|
class PaymentService:
|
|
def __init__(self):
|
|
self.db = get_supabase_admin()
|
|
self.settings = get_settings()
|
|
|
|
# ── Public methods ────────────────────────────────────
|
|
|
|
def initiate(
|
|
self,
|
|
user_id: str,
|
|
payment_type: str,
|
|
plan: Optional[str] = None,
|
|
listing_id: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create a pending payment record and return CinetPay redirect URL."""
|
|
settings = self.settings
|
|
|
|
if payment_type == "subscription":
|
|
if plan not in ("monthly", "yearly"):
|
|
raise BadRequestException("plan must be 'monthly' or 'yearly'")
|
|
amount = (
|
|
settings.SUBSCRIPTION_MONTHLY_AMOUNT
|
|
if plan == "monthly"
|
|
else settings.SUBSCRIPTION_YEARLY_AMOUNT
|
|
)
|
|
# Resolve agency_id from user
|
|
agency_result = (
|
|
self.db.table("agencies").select("id").eq("user_id", user_id).execute()
|
|
)
|
|
if not agency_result.data:
|
|
raise ForbiddenException("Agency profile not found")
|
|
agency_id = agency_result.data[0]["id"]
|
|
transaction_id = self._build_transaction_id("SUB", agency_id)
|
|
description = f"Abonnement {plan} - Deals24Togo"
|
|
metadata = {"plan": plan, "agency_id": agency_id}
|
|
|
|
elif payment_type == "purchase":
|
|
if not listing_id:
|
|
raise BadRequestException("listing_id is required for purchase")
|
|
listing_result = (
|
|
self.db.table("listings")
|
|
.select("id, title, price, status")
|
|
.eq("id", listing_id)
|
|
.execute()
|
|
)
|
|
if not listing_result.data:
|
|
raise NotFoundException("Listing not found")
|
|
listing = listing_result.data[0]
|
|
if listing["status"] != "approved":
|
|
raise BadRequestException("Listing is not available for purchase")
|
|
amount = float(listing["price"])
|
|
transaction_id = self._build_transaction_id("PUR", listing_id)
|
|
description = f"Achat - {listing['title'][:40]}"
|
|
metadata = {"listing_id": listing_id, "listing_title": listing["title"]}
|
|
|
|
else:
|
|
raise BadRequestException("type must be 'subscription' or 'purchase'")
|
|
|
|
# Insert pending payment row
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
payment_row = {
|
|
"transaction_id": transaction_id,
|
|
"type": payment_type,
|
|
"payer_id": user_id,
|
|
"amount": amount,
|
|
"currency": "XOF",
|
|
"status": "pending",
|
|
"metadata": metadata,
|
|
"created_at": now,
|
|
}
|
|
insert_result = self.db.table("payments").insert(payment_row).execute()
|
|
if not insert_result.data:
|
|
raise Exception("Failed to create payment record")
|
|
|
|
# Call CinetPay
|
|
notify_url = f"{settings.BACKEND_PUBLIC_URL}/api/v1/payments/webhook"
|
|
return_url = f"{settings.FRONTEND_URL}/payment-return?transaction_id={transaction_id}"
|
|
|
|
payload = {
|
|
"apikey": settings.CINETPAY_API_KEY,
|
|
"site_id": settings.CINETPAY_SITE_ID,
|
|
"transaction_id": transaction_id,
|
|
"amount": int(amount),
|
|
"currency": "XOF",
|
|
"description": description,
|
|
"notify_url": notify_url,
|
|
"return_url": return_url,
|
|
"channels": "ALL",
|
|
}
|
|
|
|
with httpx.Client(timeout=30) as client:
|
|
resp = client.post(CINETPAY_INIT_URL, json=payload)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
if data.get("code") != "201" and data.get("code") != "00":
|
|
raise Exception(f"CinetPay error: {data.get('message', 'Unknown error')}")
|
|
|
|
payment_url = data["data"]["payment_url"]
|
|
return {"payment_url": payment_url, "transaction_id": transaction_id}
|
|
|
|
def handle_webhook(self, form_data: dict) -> None:
|
|
"""Process CinetPay webhook notification."""
|
|
settings = self.settings
|
|
transaction_id = form_data.get("cpm_trans_id")
|
|
site_id = form_data.get("cpm_site_id")
|
|
|
|
if not transaction_id:
|
|
return
|
|
|
|
# Validate site_id
|
|
if site_id and site_id != str(settings.CINETPAY_SITE_ID):
|
|
return
|
|
|
|
# Idempotency: check if already completed
|
|
result = (
|
|
self.db.table("payments")
|
|
.select("*")
|
|
.eq("transaction_id", transaction_id)
|
|
.execute()
|
|
)
|
|
if not result.data:
|
|
return
|
|
|
|
payment = result.data[0]
|
|
if payment["status"] == "completed":
|
|
return # Already processed
|
|
|
|
# Verify with CinetPay
|
|
verify_payload = {
|
|
"apikey": settings.CINETPAY_API_KEY,
|
|
"site_id": settings.CINETPAY_SITE_ID,
|
|
"transaction_id": transaction_id,
|
|
}
|
|
with httpx.Client(timeout=30) as client:
|
|
resp = client.post(CINETPAY_CHECK_URL, json=verify_payload)
|
|
resp.raise_for_status()
|
|
verify_data = resp.json()
|
|
|
|
if verify_data.get("code") != "00":
|
|
return
|
|
|
|
cp_status = verify_data["data"].get("status")
|
|
payment_method = verify_data["data"].get("payment_method")
|
|
operator_id = verify_data["data"].get("operator_id")
|
|
|
|
if cp_status == "ACCEPTED":
|
|
# Update payment to completed
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
self.db.table("payments").update(
|
|
{
|
|
"status": "completed",
|
|
"payment_method": payment_method,
|
|
"operator_id": operator_id,
|
|
"paid_at": now,
|
|
}
|
|
).eq("transaction_id", transaction_id).execute()
|
|
|
|
# Re-fetch updated payment
|
|
updated = (
|
|
self.db.table("payments")
|
|
.select("*")
|
|
.eq("transaction_id", transaction_id)
|
|
.execute()
|
|
)
|
|
if updated.data:
|
|
payment = updated.data[0]
|
|
if payment["type"] == "subscription":
|
|
self._activate_subscription(payment)
|
|
elif payment["type"] == "purchase":
|
|
self._complete_purchase(payment)
|
|
|
|
elif cp_status in ("REFUSED",):
|
|
self.db.table("payments").update({"status": "failed"}).eq(
|
|
"transaction_id", transaction_id
|
|
).execute()
|
|
|
|
def get_receipt(self, transaction_id: str, user_id: str) -> dict:
|
|
"""Return enriched receipt data for a transaction."""
|
|
result = (
|
|
self.db.table("payments")
|
|
.select("*")
|
|
.eq("transaction_id", transaction_id)
|
|
.execute()
|
|
)
|
|
if not result.data:
|
|
raise NotFoundException("Payment not found")
|
|
|
|
payment = result.data[0]
|
|
if payment["payer_id"] != user_id:
|
|
raise ForbiddenException("Not authorized to view this payment")
|
|
|
|
# Enrich with payer info
|
|
payer_name = None
|
|
payer_email = None
|
|
user_result = (
|
|
self.db.table("users")
|
|
.select("name, email")
|
|
.eq("id", user_id)
|
|
.execute()
|
|
)
|
|
if user_result.data:
|
|
payer_name = user_result.data[0].get("name")
|
|
payer_email = user_result.data[0].get("email")
|
|
|
|
# Enrich with plan label or listing title
|
|
metadata = payment.get("metadata") or {}
|
|
plan_label = None
|
|
listing_title = None
|
|
|
|
if payment["type"] == "subscription":
|
|
plan = metadata.get("plan")
|
|
plan_label = "Mensuel (1 mois)" if plan == "monthly" else "Annuel (12 mois)"
|
|
elif payment["type"] == "purchase":
|
|
listing_title = metadata.get("listing_title")
|
|
if not listing_title:
|
|
lid = metadata.get("listing_id")
|
|
if lid:
|
|
lr = (
|
|
self.db.table("listings")
|
|
.select("title")
|
|
.eq("id", lid)
|
|
.execute()
|
|
)
|
|
if lr.data:
|
|
listing_title = lr.data[0]["title"]
|
|
|
|
return {
|
|
**payment,
|
|
"payer_name": payer_name,
|
|
"payer_email": payer_email,
|
|
"plan_label": plan_label,
|
|
"listing_title": listing_title,
|
|
}
|
|
|
|
def get_my_payments(self, user_id: str) -> list:
|
|
"""List all payments for a user."""
|
|
result = (
|
|
self.db.table("payments")
|
|
.select("*")
|
|
.eq("payer_id", user_id)
|
|
.order("created_at", desc=True)
|
|
.execute()
|
|
)
|
|
return result.data or []
|
|
|
|
def get_subscription_status(self, user_id: str) -> dict:
|
|
"""Return the current subscription status for the agency user."""
|
|
agency_result = (
|
|
self.db.table("agencies").select("id").eq("user_id", user_id).execute()
|
|
)
|
|
if not agency_result.data:
|
|
raise NotFoundException("Agency not found")
|
|
|
|
agency_id = agency_result.data[0]["id"]
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
result = (
|
|
self.db.table("subscriptions")
|
|
.select("*")
|
|
.eq("agency_id", agency_id)
|
|
.eq("status", "active")
|
|
.gt("ends_at", now)
|
|
.order("ends_at", desc=True)
|
|
.limit(1)
|
|
.execute()
|
|
)
|
|
|
|
if result.data:
|
|
return {"has_active_subscription": True, "subscription": result.data[0]}
|
|
return {"has_active_subscription": False, "subscription": None}
|
|
|
|
# ── Private helpers ───────────────────────────────────
|
|
|
|
def _activate_subscription(self, payment: dict) -> None:
|
|
"""Create or extend subscription after successful payment."""
|
|
metadata = payment.get("metadata") or {}
|
|
agency_id = metadata.get("agency_id")
|
|
plan = metadata.get("plan")
|
|
|
|
if not agency_id or not plan:
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
days = 30 if plan == "monthly" else 365
|
|
ends_at = (now + timedelta(days=days)).isoformat()
|
|
starts_at = now.isoformat()
|
|
|
|
self.db.table("subscriptions").insert(
|
|
{
|
|
"agency_id": agency_id,
|
|
"plan": plan,
|
|
"status": "active",
|
|
"starts_at": starts_at,
|
|
"ends_at": ends_at,
|
|
"payment_id": payment["id"],
|
|
"created_at": starts_at,
|
|
}
|
|
).execute()
|
|
|
|
def _complete_purchase(self, payment: dict) -> None:
|
|
"""Record purchase and mark listing as sold."""
|
|
metadata = payment.get("metadata") or {}
|
|
listing_id = metadata.get("listing_id")
|
|
|
|
if not listing_id:
|
|
return
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Insert purchase record
|
|
self.db.table("purchases").insert(
|
|
{
|
|
"listing_id": listing_id,
|
|
"buyer_id": payment.get("payer_id"),
|
|
"amount": payment["amount"],
|
|
"payment_id": payment["id"],
|
|
"created_at": now,
|
|
}
|
|
).execute()
|
|
|
|
# Mark listing as sold
|
|
self.db.table("listings").update(
|
|
{"status": "sold", "updated_at": now}
|
|
).eq("id", listing_id).execute()
|
|
|
|
@staticmethod
|
|
def _build_transaction_id(prefix: str, resource_id: str) -> str:
|
|
"""Build a ≤50-char alphanumeric transaction ID."""
|
|
short_id = resource_id[:8].replace("-", "")
|
|
ts = int(time.time())
|
|
return f"{prefix}{short_id}{ts}"
|