from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, BackgroundTasks from app.models import DocumentResponse, SuccessResponse from app.database import get_supabase from app.dependencies import get_current_user from app.services.document_processor import process_document from app.services.embeddings import embedding_service from app.services.vector_store import vector_store from app.config import settings from typing import List import uuid import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/chatbots/{chatbot_id}/documents", tags=["Documents"]) ALLOWED_TYPES = { "application/pdf": ".pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", "text/csv": ".csv", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", "text/plain": ".txt", "text/markdown": ".md", } ALLOWED_EXTENSIONS = {".pdf", ".docx", ".csv", ".xlsx", ".txt", ".md"} def _get_user_chatbot(chatbot_id: str, user_id: str, supabase) -> dict: company = supabase.table("companies").select("id").eq("owner_id", user_id).execute() if not company.data: raise HTTPException(status_code=404, detail="Company not found") company_id = company.data[0]["id"] chatbot = supabase.table("chatbots").select("*").eq("id", chatbot_id).eq("company_id", company_id).execute() if not chatbot.data: raise HTTPException(status_code=404, detail="Chatbot not found") return chatbot.data[0] @router.post("", response_model=DocumentResponse, status_code=201) async def upload_document( chatbot_id: str, background_tasks: BackgroundTasks, file: UploadFile = File(...), user=Depends(get_current_user), ): supabase = get_supabase() chatbot = _get_user_chatbot(chatbot_id, user.id, supabase) # Validate file if not file.filename: raise HTTPException(status_code=400, detail="No filename provided") ext = "." + file.filename.rsplit(".", 1)[-1].lower() if "." in file.filename else "" if ext not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=400, detail=f"File type not supported. Allowed: {', '.join(ALLOWED_EXTENSIONS)}" ) # Read file file_bytes = await file.read() file_size = len(file_bytes) max_size = settings.max_file_size_mb * 1024 * 1024 if file_size > max_size: raise HTTPException( status_code=413, detail=f"File too large. Max size: {settings.max_file_size_mb}MB" ) # Create document record doc_id = str(uuid.uuid4()) doc_data = { "id": doc_id, "chatbot_id": chatbot_id, "file_name": file.filename, "file_type": ext, "file_size": file_size, "chunk_count": 0, "status": "processing", } result = supabase.table("documents").insert(doc_data).execute() if not result.data: raise HTTPException(status_code=500, detail="Failed to create document record") # Process in background background_tasks.add_task( _process_document_bg, file_bytes=file_bytes, file_name=file.filename, doc_id=doc_id, chatbot=chatbot, supabase=supabase, ) return DocumentResponse(**result.data[0]) async def _process_document_bg( file_bytes: bytes, file_name: str, doc_id: str, chatbot: dict, supabase, ): """Background task to process and embed a document""" try: company_id = chatbot.get("company_id", "") collection_name = chatbot.get("qdrant_collection_name") if not collection_name: logger.error(f"No Qdrant collection for chatbot {chatbot['id']}") supabase.table("documents").update({ "status": "failed", "error_message": "Vector store not configured" }).eq("id", doc_id).execute() return # Ensure collection exists if not vector_store.collection_exists(collection_name): vector_store.create_collection(collection_name) # Process document chunks, payloads = process_document( file_bytes=file_bytes, file_name=file_name, document_id=doc_id, company_id=company_id, ) if not chunks: raise ValueError("No text extracted from document") # Generate embeddings in batches batch_size = 50 all_ids = [] all_vectors = [] all_payloads = [] for i in range(0, len(chunks), batch_size): batch_chunks = chunks[i:i + batch_size] batch_payloads = payloads[i:i + batch_size] vectors = embedding_service.embed_batch(batch_chunks) ids = [str(uuid.uuid4()) for _ in vectors] all_ids.extend(ids) all_vectors.extend(vectors) all_payloads.extend(batch_payloads) # Upsert to Qdrant vector_store.upsert_vectors( collection_name=collection_name, vectors=all_vectors, payloads=all_payloads, ids=all_ids, ) # Update document record supabase.table("documents").update({ "status": "completed", "chunk_count": len(chunks), }).eq("id", doc_id).execute() logger.info(f"Document {doc_id} processed: {len(chunks)} chunks") except Exception as e: logger.error(f"Document processing error for {doc_id}: {e}") supabase.table("documents").update({ "status": "failed", "error_message": str(e)[:500], }).eq("id", doc_id).execute() @router.get("", response_model=List[DocumentResponse]) async def list_documents(chatbot_id: str, user=Depends(get_current_user)): supabase = get_supabase() _get_user_chatbot(chatbot_id, user.id, supabase) result = supabase.table("documents").select("*") \ .eq("chatbot_id", chatbot_id) \ .order("created_at", desc=True) \ .execute() return [DocumentResponse(**d) for d in (result.data or [])] @router.delete("/{document_id}", response_model=SuccessResponse) async def delete_document(chatbot_id: str, document_id: str, user=Depends(get_current_user)): supabase = get_supabase() chatbot = _get_user_chatbot(chatbot_id, user.id, supabase) doc = supabase.table("documents").select("*").eq("id", document_id).eq("chatbot_id", chatbot_id).execute() if not doc.data: raise HTTPException(status_code=404, detail="Document not found") # Remove vectors from Qdrant collection_name = chatbot.get("qdrant_collection_name") if collection_name: try: vector_store.delete_by_document_id(collection_name, document_id) except Exception as e: logger.warning(f"Failed to delete vectors: {e}") supabase.table("documents").delete().eq("id", document_id).execute() return SuccessResponse(success=True, message="Document deleted")