mirror of
http://88.130.71.182:3000/BlitTech/contexta_be.git
synced 2026-06-13 08:45:24 +00:00
- Add new routers: admin, appointments, campaigns - Add storage service and logging config - Add migrations directory and test suite with pytest config - Add supabase_migration_features.sql - Update models, dependencies, config, and existing routers - Remove whatsapp_service (deleted) - Update pyproject.toml and uv.lock dependencies Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
47 lines
1.5 KiB
Python
47 lines
1.5 KiB
Python
"""
|
|
Supabase Storage helper utilities.
|
|
Used to delete files from storage buckets when deleting documents or chatbots.
|
|
"""
|
|
import logging
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def extract_storage_path(url: str, bucket: str) -> Optional[str]:
|
|
"""
|
|
Extract the file path from a Supabase Storage public URL.
|
|
|
|
URL format: {supabase_url}/storage/v1/object/public/{bucket}/{path}
|
|
Returns the path portion after the bucket name, or None if not parseable.
|
|
"""
|
|
if not url:
|
|
return None
|
|
try:
|
|
parsed = urlparse(url)
|
|
prefix = f"/storage/v1/object/public/{bucket}/"
|
|
if prefix in parsed.path:
|
|
idx = parsed.path.index(prefix) + len(prefix)
|
|
return parsed.path[idx:]
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract storage path from URL '{url}': {e}")
|
|
return None
|
|
|
|
|
|
def delete_from_storage(supabase, bucket: str, url: str) -> bool:
|
|
"""
|
|
Delete a file from a Supabase Storage bucket given its public URL.
|
|
Returns True on success, False if the URL couldn't be parsed or deletion failed.
|
|
"""
|
|
path = extract_storage_path(url, bucket)
|
|
if not path:
|
|
return False
|
|
try:
|
|
supabase.storage.from_(bucket).remove([path])
|
|
logger.info(f"Deleted storage file: {bucket}/{path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete storage file {bucket}/{path}: {e}")
|
|
return False
|