mirror of
http://88.130.71.182:3000/BlitTech/contexta_be.git
synced 2026-06-12 23:23:21 +00:00
Initial commit
This commit is contained in:
221
app/services/document_processor.py
Normal file
221
app/services/document_processor.py
Normal file
@@ -0,0 +1,221 @@
|
||||
import io
|
||||
import logging
|
||||
from typing import List, Dict, Any, Tuple
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CHUNK_SIZE = 512 # tokens approximate (chars ÷ 4)
|
||||
CHUNK_OVERLAP = 50
|
||||
|
||||
|
||||
def parse_pdf(file_bytes: bytes) -> List[Dict[str, Any]]:
|
||||
"""Parse PDF and return list of {text, page_number}"""
|
||||
try:
|
||||
import pypdf
|
||||
|
||||
reader = pypdf.PdfReader(io.BytesIO(file_bytes))
|
||||
pages = []
|
||||
for i, page in enumerate(reader.pages):
|
||||
text = page.extract_text() or ""
|
||||
text = text.strip()
|
||||
if text:
|
||||
pages.append({"text": text, "page_number": i + 1})
|
||||
return pages
|
||||
except Exception as e:
|
||||
logger.error(f"PDF parse error: {e}")
|
||||
raise ValueError(f"Failed to parse PDF: {str(e)}")
|
||||
|
||||
|
||||
def parse_docx(file_bytes: bytes) -> List[Dict[str, Any]]:
|
||||
"""Parse DOCX and return list of {text, page_number}"""
|
||||
try:
|
||||
from docx import Document
|
||||
|
||||
doc = Document(io.BytesIO(file_bytes))
|
||||
sections = []
|
||||
current_text = []
|
||||
section_idx = 1
|
||||
|
||||
for para in doc.paragraphs:
|
||||
text = para.text.strip()
|
||||
if not text:
|
||||
continue
|
||||
|
||||
# New section on headings
|
||||
if para.style.name.startswith("Heading"):
|
||||
if current_text:
|
||||
sections.append(
|
||||
{"text": "\n".join(current_text), "page_number": section_idx}
|
||||
)
|
||||
current_text = []
|
||||
section_idx += 1
|
||||
current_text.append(text)
|
||||
|
||||
if current_text:
|
||||
sections.append({"text": "\n".join(current_text), "page_number": section_idx})
|
||||
|
||||
return sections if sections else [{"text": "", "page_number": 1}]
|
||||
except Exception as e:
|
||||
logger.error(f"DOCX parse error: {e}")
|
||||
raise ValueError(f"Failed to parse DOCX: {str(e)}")
|
||||
|
||||
|
||||
def parse_csv(file_bytes: bytes) -> List[Dict[str, Any]]:
|
||||
"""Parse CSV - each row becomes a chunk"""
|
||||
try:
|
||||
import pandas as pd
|
||||
|
||||
df = pd.read_csv(io.BytesIO(file_bytes))
|
||||
columns = list(df.columns)
|
||||
chunks = []
|
||||
|
||||
# Process in batches of rows
|
||||
batch_size = 10
|
||||
for start in range(0, len(df), batch_size):
|
||||
batch = df.iloc[start : start + batch_size]
|
||||
rows_text = []
|
||||
for _, row in batch.iterrows():
|
||||
row_parts = [f"{col}: {val}" for col, val in zip(columns, row) if str(val) != "nan"]
|
||||
rows_text.append(" | ".join(row_parts))
|
||||
text = "\n".join(rows_text)
|
||||
chunks.append({"text": text, "page_number": (start // batch_size) + 1})
|
||||
|
||||
return chunks
|
||||
except Exception as e:
|
||||
logger.error(f"CSV parse error: {e}")
|
||||
raise ValueError(f"Failed to parse CSV: {str(e)}")
|
||||
|
||||
|
||||
def parse_xlsx(file_bytes: bytes) -> List[Dict[str, Any]]:
|
||||
"""Parse XLSX - each sheet becomes sections"""
|
||||
try:
|
||||
import pandas as pd
|
||||
|
||||
xl = pd.ExcelFile(io.BytesIO(file_bytes))
|
||||
chunks = []
|
||||
page_num = 1
|
||||
|
||||
for sheet_name in xl.sheet_names:
|
||||
df = xl.parse(sheet_name)
|
||||
columns = list(df.columns)
|
||||
|
||||
batch_size = 10
|
||||
for start in range(0, len(df), batch_size):
|
||||
batch = df.iloc[start : start + batch_size]
|
||||
rows_text = [f"Sheet: {sheet_name}"]
|
||||
for _, row in batch.iterrows():
|
||||
row_parts = [
|
||||
f"{col}: {val}"
|
||||
for col, val in zip(columns, row)
|
||||
if str(val) not in ("nan", "NaT", "None")
|
||||
]
|
||||
if row_parts:
|
||||
rows_text.append(" | ".join(row_parts))
|
||||
text = "\n".join(rows_text)
|
||||
if text.strip():
|
||||
chunks.append({"text": text, "page_number": page_num})
|
||||
page_num += 1
|
||||
|
||||
return chunks
|
||||
except Exception as e:
|
||||
logger.error(f"XLSX parse error: {e}")
|
||||
raise ValueError(f"Failed to parse XLSX: {str(e)}")
|
||||
|
||||
|
||||
def parse_txt(file_bytes: bytes) -> List[Dict[str, Any]]:
|
||||
"""Parse plain text"""
|
||||
try:
|
||||
text = file_bytes.decode("utf-8", errors="ignore")
|
||||
# Split into sections by double newlines
|
||||
sections = [s.strip() for s in text.split("\n\n") if s.strip()]
|
||||
if not sections:
|
||||
sections = [text.strip()]
|
||||
return [{"text": s, "page_number": i + 1} for i, s in enumerate(sections)]
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to parse TXT: {str(e)}")
|
||||
|
||||
|
||||
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]:
|
||||
"""Split text into overlapping chunks"""
|
||||
# Approximate token count: 1 token ≈ 4 chars
|
||||
char_size = chunk_size * 4
|
||||
char_overlap = overlap * 4
|
||||
|
||||
if len(text) <= char_size:
|
||||
return [text]
|
||||
|
||||
chunks = []
|
||||
start = 0
|
||||
while start < len(text):
|
||||
end = min(start + char_size, len(text))
|
||||
|
||||
# Try to break at sentence boundary
|
||||
if end < len(text):
|
||||
for sep in [". ", "! ", "? ", "\n", " "]:
|
||||
pos = text.rfind(sep, start, end)
|
||||
if pos > start + char_size // 2:
|
||||
end = pos + len(sep)
|
||||
break
|
||||
|
||||
chunk = text[start:end].strip()
|
||||
if chunk:
|
||||
chunks.append(chunk)
|
||||
|
||||
start = end - char_overlap if end - char_overlap > start else end
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def process_document(
|
||||
file_bytes: bytes,
|
||||
file_name: str,
|
||||
document_id: str,
|
||||
company_id: str,
|
||||
) -> Tuple[List[str], List[Dict[str, Any]]]:
|
||||
"""
|
||||
Main entry point: parse and chunk a document.
|
||||
Returns (chunks_text, chunk_payloads)
|
||||
"""
|
||||
ext = Path(file_name).suffix.lower()
|
||||
|
||||
# Parse
|
||||
if ext == ".pdf":
|
||||
pages = parse_pdf(file_bytes)
|
||||
elif ext == ".docx":
|
||||
pages = parse_docx(file_bytes)
|
||||
elif ext == ".csv":
|
||||
pages = parse_csv(file_bytes)
|
||||
elif ext in (".xlsx", ".xls"):
|
||||
pages = parse_xlsx(file_bytes)
|
||||
elif ext in (".txt", ".md"):
|
||||
pages = parse_txt(file_bytes)
|
||||
else:
|
||||
raise ValueError(f"Unsupported file type: {ext}")
|
||||
|
||||
# Chunk
|
||||
all_chunks = []
|
||||
all_payloads = []
|
||||
|
||||
for page in pages:
|
||||
text = page["text"]
|
||||
page_num = page.get("page_number", 1)
|
||||
|
||||
chunks = chunk_text(text)
|
||||
for idx, chunk in enumerate(chunks):
|
||||
all_chunks.append(chunk)
|
||||
all_payloads.append(
|
||||
{
|
||||
"document_id": document_id,
|
||||
"company_id": company_id,
|
||||
"file_name": file_name,
|
||||
"page_number": page_num,
|
||||
"chunk_index": idx,
|
||||
"text": chunk,
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Processed {file_name}: {len(pages)} pages → {len(all_chunks)} chunks"
|
||||
)
|
||||
return all_chunks, all_payloads
|
||||
Reference in New Issue
Block a user