Files
badoHair_be/app/services/stripe_service.py
belviskhoremk d2dc43b16f Initial Commit
2026-05-12 00:34:21 +00:00

62 lines
1.7 KiB
Python

import asyncio
import stripe
from app.config import get_settings
from app.exceptions import PaymentError
_stripe_initialized = False
def _init_stripe():
global _stripe_initialized
if not _stripe_initialized:
stripe.api_key = get_settings().STRIPE_SECRET_KEY
_stripe_initialized = True
async def create_payment_intent(
amount: float,
metadata: dict,
description: str = "",
) -> stripe.PaymentIntent:
_init_stripe()
settings = get_settings()
try:
intent = await asyncio.to_thread(
stripe.PaymentIntent.create,
amount=int(round(amount * 100)),
currency=settings.STRIPE_CURRENCY,
metadata=metadata,
description=description,
automatic_payment_methods={"enabled": True},
)
return intent
except stripe.StripeError as e:
raise PaymentError(str(e))
async def create_refund(
payment_intent_id: str,
amount: float | None = None,
reason: str | None = None,
) -> stripe.Refund:
_init_stripe()
try:
kwargs: dict = {"payment_intent": payment_intent_id}
if amount is not None:
kwargs["amount"] = int(round(amount * 100))
if reason:
kwargs["reason"] = reason
return await asyncio.to_thread(stripe.Refund.create, **kwargs)
except stripe.StripeError as e:
raise PaymentError(str(e))
def verify_webhook(payload: bytes, sig_header: str) -> stripe.Event:
_init_stripe()
settings = get_settings()
try:
return stripe.Webhook.construct_event(payload, sig_header, settings.STRIPE_WEBHOOK_SECRET)
except stripe.SignatureVerificationError:
raise PaymentError("Invalid webhook signature")