mirror of
http://88.130.71.182:3000/BlitTech/contexta_be.git
synced 2026-06-13 08:45:24 +00:00
209 lines
6.9 KiB
Python
209 lines
6.9 KiB
Python
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, BackgroundTasks
|
|
from app.models import DocumentResponse, SuccessResponse
|
|
from app.database import get_supabase
|
|
from app.dependencies import get_current_user
|
|
from app.services.document_processor import process_document
|
|
from app.services.embeddings import embedding_service
|
|
from app.services.vector_store import vector_store
|
|
from app.config import settings
|
|
from typing import List
|
|
import uuid
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/chatbots/{chatbot_id}/documents", tags=["Documents"])
|
|
|
|
ALLOWED_TYPES = {
|
|
"application/pdf": ".pdf",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
|
|
"text/csv": ".csv",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
|
|
"text/plain": ".txt",
|
|
"text/markdown": ".md",
|
|
}
|
|
|
|
ALLOWED_EXTENSIONS = {".pdf", ".docx", ".csv", ".xlsx", ".txt", ".md"}
|
|
|
|
|
|
def _get_user_chatbot(chatbot_id: str, user_id: str, supabase) -> dict:
|
|
company = supabase.table("companies").select("id").eq("owner_id", user_id).execute()
|
|
if not company.data:
|
|
raise HTTPException(status_code=404, detail="Company not found")
|
|
company_id = company.data[0]["id"]
|
|
|
|
chatbot = supabase.table("chatbots").select("*").eq("id", chatbot_id).eq("company_id", company_id).execute()
|
|
if not chatbot.data:
|
|
raise HTTPException(status_code=404, detail="Chatbot not found")
|
|
return chatbot.data[0]
|
|
|
|
|
|
@router.post("", response_model=DocumentResponse, status_code=201)
|
|
async def upload_document(
|
|
chatbot_id: str,
|
|
background_tasks: BackgroundTasks,
|
|
file: UploadFile = File(...),
|
|
user=Depends(get_current_user),
|
|
):
|
|
supabase = get_supabase()
|
|
chatbot = _get_user_chatbot(chatbot_id, user.id, supabase)
|
|
|
|
# Validate file
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="No filename provided")
|
|
|
|
ext = "." + file.filename.rsplit(".", 1)[-1].lower() if "." in file.filename else ""
|
|
if ext not in ALLOWED_EXTENSIONS:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"File type not supported. Allowed: {', '.join(ALLOWED_EXTENSIONS)}"
|
|
)
|
|
|
|
# Read file
|
|
file_bytes = await file.read()
|
|
file_size = len(file_bytes)
|
|
max_size = settings.max_file_size_mb * 1024 * 1024
|
|
|
|
if file_size > max_size:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"File too large. Max size: {settings.max_file_size_mb}MB"
|
|
)
|
|
|
|
# Create document record
|
|
doc_id = str(uuid.uuid4())
|
|
doc_data = {
|
|
"id": doc_id,
|
|
"chatbot_id": chatbot_id,
|
|
"file_name": file.filename,
|
|
"file_type": ext,
|
|
"file_size": file_size,
|
|
"chunk_count": 0,
|
|
"status": "processing",
|
|
}
|
|
|
|
result = supabase.table("documents").insert(doc_data).execute()
|
|
if not result.data:
|
|
raise HTTPException(status_code=500, detail="Failed to create document record")
|
|
|
|
# Process in background
|
|
background_tasks.add_task(
|
|
_process_document_bg,
|
|
file_bytes=file_bytes,
|
|
file_name=file.filename,
|
|
doc_id=doc_id,
|
|
chatbot=chatbot,
|
|
supabase=supabase,
|
|
)
|
|
|
|
return DocumentResponse(**result.data[0])
|
|
|
|
|
|
async def _process_document_bg(
|
|
file_bytes: bytes,
|
|
file_name: str,
|
|
doc_id: str,
|
|
chatbot: dict,
|
|
supabase,
|
|
):
|
|
"""Background task to process and embed a document"""
|
|
try:
|
|
company_id = chatbot.get("company_id", "")
|
|
collection_name = chatbot.get("qdrant_collection_name")
|
|
|
|
if not collection_name:
|
|
logger.error(f"No Qdrant collection for chatbot {chatbot['id']}")
|
|
supabase.table("documents").update({
|
|
"status": "failed",
|
|
"error_message": "Vector store not configured"
|
|
}).eq("id", doc_id).execute()
|
|
return
|
|
|
|
# Ensure collection exists
|
|
if not vector_store.collection_exists(collection_name):
|
|
vector_store.create_collection(collection_name)
|
|
|
|
# Process document
|
|
chunks, payloads = process_document(
|
|
file_bytes=file_bytes,
|
|
file_name=file_name,
|
|
document_id=doc_id,
|
|
company_id=company_id,
|
|
)
|
|
|
|
if not chunks:
|
|
raise ValueError("No text extracted from document")
|
|
|
|
# Generate embeddings in batches
|
|
batch_size = 50
|
|
all_ids = []
|
|
all_vectors = []
|
|
all_payloads = []
|
|
|
|
for i in range(0, len(chunks), batch_size):
|
|
batch_chunks = chunks[i:i + batch_size]
|
|
batch_payloads = payloads[i:i + batch_size]
|
|
|
|
vectors = embedding_service.embed_batch(batch_chunks)
|
|
ids = [str(uuid.uuid4()) for _ in vectors]
|
|
|
|
all_ids.extend(ids)
|
|
all_vectors.extend(vectors)
|
|
all_payloads.extend(batch_payloads)
|
|
|
|
# Upsert to Qdrant
|
|
vector_store.upsert_vectors(
|
|
collection_name=collection_name,
|
|
vectors=all_vectors,
|
|
payloads=all_payloads,
|
|
ids=all_ids,
|
|
)
|
|
|
|
# Update document record
|
|
supabase.table("documents").update({
|
|
"status": "completed",
|
|
"chunk_count": len(chunks),
|
|
}).eq("id", doc_id).execute()
|
|
|
|
logger.info(f"Document {doc_id} processed: {len(chunks)} chunks")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Document processing error for {doc_id}: {e}")
|
|
supabase.table("documents").update({
|
|
"status": "failed",
|
|
"error_message": str(e)[:500],
|
|
}).eq("id", doc_id).execute()
|
|
|
|
|
|
@router.get("", response_model=List[DocumentResponse])
|
|
async def list_documents(chatbot_id: str, user=Depends(get_current_user)):
|
|
supabase = get_supabase()
|
|
_get_user_chatbot(chatbot_id, user.id, supabase)
|
|
|
|
result = supabase.table("documents").select("*") \
|
|
.eq("chatbot_id", chatbot_id) \
|
|
.order("created_at", desc=True) \
|
|
.execute()
|
|
|
|
return [DocumentResponse(**d) for d in (result.data or [])]
|
|
|
|
|
|
@router.delete("/{document_id}", response_model=SuccessResponse)
|
|
async def delete_document(chatbot_id: str, document_id: str, user=Depends(get_current_user)):
|
|
supabase = get_supabase()
|
|
chatbot = _get_user_chatbot(chatbot_id, user.id, supabase)
|
|
|
|
doc = supabase.table("documents").select("*").eq("id", document_id).eq("chatbot_id", chatbot_id).execute()
|
|
if not doc.data:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
# Remove vectors from Qdrant
|
|
collection_name = chatbot.get("qdrant_collection_name")
|
|
if collection_name:
|
|
try:
|
|
vector_store.delete_by_document_id(collection_name, document_id)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete vectors: {e}")
|
|
|
|
supabase.table("documents").delete().eq("id", document_id).execute()
|
|
return SuccessResponse(success=True, message="Document deleted")
|