Coverage for mindsdb / interfaces / knowledge_base / preprocessing / document_preprocessor.py: 56%
179 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import re
2import html
3import asyncio
4from typing import List, Dict, Optional, Any
6import pandas as pd
7from mindsdb.interfaces.knowledge_base.preprocessing.text_splitter import TextSplitter
9from mindsdb.integrations.utilities.rag.splitters.file_splitter import (
10 FileSplitter,
11 FileSplitterConfig,
12)
14try:
15 from mindsdb.interfaces.agents.langchain_agent import create_chat_model
16except ModuleNotFoundError as exc: # pragma: no cover - optional dependency
17 if getattr(exc, "name", "") and "langchain" in exc.name:
18 create_chat_model = None
19 _LANGCHAIN_IMPORT_ERROR = exc
20 else:
21 raise
22else:
23 _LANGCHAIN_IMPORT_ERROR = None
24from mindsdb.interfaces.knowledge_base.preprocessing.models import (
25 PreprocessingConfig,
26 ProcessedChunk,
27 PreprocessorType,
28 ContextualConfig,
29 Document,
30 TextChunkingConfig,
31)
32from mindsdb.utilities import log
34logger = log.getLogger(__name__)
36_DEFAULT_CONTENT_COLUMN_NAME = "content"
39def _require_agent_extra(feature: str):
40 if create_chat_model is None: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true
41 raise ImportError(
42 f"{feature} requires the optional agent dependencies. Install them via `pip install mindsdb[kb]`."
43 ) from _LANGCHAIN_IMPORT_ERROR
46class DocumentPreprocessor:
47 """Base class for document preprocessing"""
49 def __init__(self):
50 """Initialize preprocessor"""
51 self.splitter = None # Will be set by child classes
52 self.config = None
54 def process_documents(self, documents: List[Document]) -> List[ProcessedChunk]:
55 """Base implementation - should be overridden by child classes
57 Args:
58 documents: List of documents to process
59 """
60 raise NotImplementedError("Subclasses must implement process_documents")
62 def _split_document(self, doc: Document) -> List[Document]:
63 """Split document into chunks while preserving metadata"""
64 if self.splitter is None:
65 raise ValueError("Splitter not configured")
67 metadata = doc.metadata or {}
68 # Split and convert back to our Document type
69 split_texts = self.splitter.split_text(doc.content)
70 return [Document(content=text, metadata=metadata) for text in split_texts]
72 def _get_source(self) -> str:
73 """Get the source identifier for this preprocessor"""
74 return self.__class__.__name__
76 def to_dataframe(self, chunks: List[ProcessedChunk]) -> pd.DataFrame:
77 """Convert processed chunks to dataframe format"""
78 return pd.DataFrame([chunk.model_dump() for chunk in chunks])
80 @staticmethod
81 def dict_to_document(data: Dict[str, Any]) -> Document:
82 """Convert a dictionary to a Document object"""
83 return Document(
84 id=data.get("id"),
85 content=data.get("content", ""),
86 embeddings=data.get("embeddings"),
87 metadata=data.get("metadata", {}),
88 )
90 def _generate_chunk_id(
91 self,
92 chunk_index: Optional[int] = None,
93 total_chunks: Optional[int] = None,
94 start_char: Optional[int] = None,
95 end_char: Optional[int] = None,
96 provided_id: str = None,
97 content_column: str = None,
98 ) -> str:
99 """Generate human-readable deterministic ID for a chunk
100 Format: <doc_id>:<content_column>:<chunk_number>of<total_chunks>:<start_char>to<end_char>
101 """
102 if provided_id is None:
103 raise ValueError("Document ID must be provided for chunk ID generation")
105 if content_column is None:
106 raise ValueError("Content column must be provided for chunk ID generation")
108 chunk_id = f"{provided_id}:{content_column}:{chunk_index + 1}of{total_chunks}:{start_char}to{end_char}"
109 logger.debug(f"Generated chunk ID: {chunk_id}")
110 return chunk_id
112 def _prepare_chunk_metadata(
113 self,
114 doc_id: Optional[str],
115 chunk_index: Optional[int],
116 base_metadata: Optional[Dict] = None,
117 ) -> Dict:
118 """Centralized method for preparing chunk metadata"""
119 metadata = base_metadata or {}
121 # Always preserve original document ID
122 if doc_id is not None: 122 ↛ 126line 122 didn't jump to line 126 because the condition on line 122 was always true
123 metadata[self.config.doc_id_column_name] = doc_id
125 # Add chunk index only for multi-chunk cases
126 if chunk_index is not None: 126 ↛ 130line 126 didn't jump to line 130 because the condition on line 126 was always true
127 metadata["_chunk_index"] = chunk_index
129 # Always set source
130 metadata["_source"] = self._get_source()
132 return metadata
135class ContextualPreprocessor(DocumentPreprocessor):
136 """Contextual preprocessing implementation that enhances document chunks with context"""
138 DEFAULT_CONTEXT_TEMPLATE = """
139<document>
140{WHOLE_DOCUMENT}
141</document>
142Here is the chunk we want to situate within the whole document
143<chunk>
144{CHUNK_CONTENT}
145</chunk>
146Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk. Answer only with the succinct context and nothing else."""
148 def __init__(self, config: ContextualConfig):
149 """Initialize with contextual configuration"""
150 super().__init__()
151 self.config = config
152 self.splitter = FileSplitter(
153 FileSplitterConfig(chunk_size=config.chunk_size, chunk_overlap=config.chunk_overlap)
154 )
155 _require_agent_extra("Contextual preprocessing")
156 self.llm = create_chat_model(
157 {
158 "model_name": self.config.llm_config.model_name,
159 "provider": self.config.llm_config.provider,
160 **self.config.llm_config.params,
161 }
162 )
163 self.context_template = config.context_template or self.DEFAULT_CONTEXT_TEMPLATE
164 self.summarize = self.config.summarize
166 def _prepare_prompts(self, chunk_contents: list[str], full_documents: list[str]) -> list[str]:
167 def tag_replacer(match):
168 tag = match.group(0)
169 if tag.lower() not in ["<document>", "</document>", "<chunk>", "</chunk>"]:
170 return tag
171 return html.escape(tag)
173 tag_pattern = r"</?document>|</?chunk>"
174 prompts = []
175 for chunk_content, full_document in zip(chunk_contents, full_documents):
176 chunk_content = re.sub(tag_pattern, tag_replacer, chunk_content, flags=re.IGNORECASE)
177 full_document = re.sub(tag_pattern, tag_replacer, full_document, flags=re.IGNORECASE)
178 prompts.append(
179 self.DEFAULT_CONTEXT_TEMPLATE.format(WHOLE_DOCUMENT=full_document, CHUNK_CONTENT=chunk_content)
180 )
182 return prompts
184 def _generate_context(self, chunk_contents: list[str], full_documents: list[str]) -> list[str]:
185 """Generate contextual description for a chunk using LLM"""
186 prompts = self._prepare_prompts(chunk_contents, full_documents)
188 # Check if LLM supports async
189 if hasattr(self.llm, "abatch"):
190 loop = asyncio.new_event_loop()
191 asyncio.set_event_loop(loop)
192 try:
193 response = loop.run_until_complete(self._async_generate(prompts))
194 finally:
195 loop.close()
196 else:
197 # Use sync batch for non-async LLMs
198 response = [resp.content for resp in self.llm.batch(prompts)]
199 return response
201 async def _async_generate(self, prompts: list[str]) -> list[str]:
202 """Helper for async LLM generation"""
203 return [resp.content for resp in await self.llm.abatch(prompts)]
205 def _split_document(self, doc: Document) -> List[Document]:
206 """Split document into chunks while preserving metadata"""
207 # Use base class implementation
208 return super()._split_document(doc)
210 def process_documents(self, documents: List[Document]) -> List[ProcessedChunk]:
211 chunks_list = []
212 doc_index_list = []
213 chunk_index_list = []
214 processed_chunks = []
216 for doc_index, doc in enumerate(documents):
217 # Document ID must be provided by this point
218 if doc.id is None:
219 raise ValueError("Document ID must be provided before preprocessing")
221 # Skip empty or whitespace-only content
222 if not doc.content or not doc.content.strip():
223 continue
225 chunk_docs = self._split_document(doc)
227 # Single chunk case
228 if len(chunk_docs) == 1:
229 chunk_doc = chunk_docs[0]
230 if not chunk_doc.content or not chunk_doc.content.strip():
231 continue
232 else:
233 chunks_list.append(chunk_doc)
234 doc_index_list.append(doc_index)
235 chunk_index_list.append(0)
237 else:
238 # Multiple chunks case
239 for i, chunk_doc in enumerate(chunk_docs):
240 if not chunk_doc.content or not chunk_doc.content.strip():
241 continue
242 else:
243 chunks_list.append(chunk_doc)
244 doc_index_list.append(doc_index)
245 chunk_index_list.append(i)
247 # Generate contexts
248 doc_contents = [documents[i].content for i in doc_index_list]
249 chunk_contents = [chunk_doc.content for chunk_doc in chunks_list]
250 contexts = self._generate_context(chunk_contents, doc_contents)
252 for context, chunk_doc, chunk_index, doc_index in zip(contexts, chunks_list, chunk_index_list, doc_index_list):
253 processed_content = context if self.summarize else f"{context}\n\n{chunk_doc.content}"
254 doc = documents[doc_index]
256 # Initialize metadata
257 metadata = {}
258 if doc.metadata:
259 metadata.update(doc.metadata)
261 # Get content_column from metadata or use default
262 content_column = metadata.get("_content_column")
263 if content_column is None:
264 # If content_column is not in metadata, use the default column name
265 content_column = _DEFAULT_CONTENT_COLUMN_NAME
266 logger.debug(f"No content_column found in metadata, using default: {_DEFAULT_CONTENT_COLUMN_NAME}")
268 chunk_id = self._generate_chunk_id(
269 chunk_index=chunk_index, provided_id=doc.id, content_column=content_column
270 )
271 processed_chunks.append(
272 ProcessedChunk(
273 id=chunk_id,
274 content=processed_content, # Use the content with context
275 embeddings=doc.embeddings,
276 metadata=self._prepare_chunk_metadata(doc.id, None, metadata),
277 )
278 )
280 return processed_chunks
283class TextChunkingPreprocessor(DocumentPreprocessor):
284 """Default text chunking preprocessor using TextSplitter"""
286 def __init__(self, config: Optional[TextChunkingConfig] = None):
287 """Initialize with text chunking configuration"""
288 super().__init__()
289 self.config = config or TextChunkingConfig()
290 self.splitter = TextSplitter(
291 chunk_size=self.config.chunk_size,
292 chunk_overlap=self.config.chunk_overlap,
293 separators=self.config.separators,
294 )
296 def _split_document(self, doc: Document) -> List[Document]:
297 """Split document into chunks while preserving metadata"""
298 # Use base class implementation
299 return super()._split_document(doc)
301 def process_documents(self, documents: List[Document]) -> List[ProcessedChunk]:
302 processed_chunks = []
304 for doc in documents:
305 # Document ID must be provided by this point
306 if doc.id is None: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 raise ValueError("Document ID must be provided before preprocessing")
309 # Skip empty or whitespace-only content
310 if not doc.content or not doc.content.strip():
311 continue
313 chunk_docs = self._split_document(doc)
314 total_chunks = len(chunk_docs)
316 # Track character positions
317 current_pos = 0
318 for i, chunk_doc in enumerate(chunk_docs):
319 if not chunk_doc.content or not chunk_doc.content.strip(): 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 continue
322 # Calculate chunk positions
323 start_char = current_pos
324 end_char = start_char + len(chunk_doc.content)
325 current_pos = end_char + 1 # +1 for separator
327 # Initialize metadata
328 metadata = {}
329 if doc.metadata: 329 ↛ 333line 329 didn't jump to line 333 because the condition on line 329 was always true
330 metadata.update(doc.metadata)
332 # Add position metadata
333 metadata["_start_char"] = start_char
334 metadata["_end_char"] = end_char
336 # Get content_column from metadata or use default
337 content_column = None
338 if doc.metadata: 338 ↛ 341line 338 didn't jump to line 341 because the condition on line 338 was always true
339 content_column = doc.metadata.get("_content_column")
341 if content_column is None: 341 ↛ 346line 341 didn't jump to line 346 because the condition on line 341 was always true
342 # If content_column is not in metadata, use the default column name
343 content_column = _DEFAULT_CONTENT_COLUMN_NAME
344 logger.debug(f"No content_column found in metadata, using default: {_DEFAULT_CONTENT_COLUMN_NAME}")
346 chunk_id = self._generate_chunk_id(
347 chunk_index=i,
348 total_chunks=total_chunks,
349 start_char=start_char,
350 end_char=end_char,
351 provided_id=doc.id,
352 content_column=content_column,
353 )
355 processed_chunks.append(
356 ProcessedChunk(
357 id=chunk_id,
358 content=chunk_doc.content,
359 embeddings=doc.embeddings,
360 metadata=self._prepare_chunk_metadata(doc.id, i, metadata),
361 )
362 )
364 return processed_chunks
367class PreprocessorFactory:
368 """Factory for creating preprocessors based on configuration"""
370 @staticmethod
371 def create_preprocessor(
372 config: Optional[PreprocessingConfig] = None,
373 ) -> DocumentPreprocessor:
374 """
375 Create appropriate preprocessor based on configuration
376 :param config: Preprocessing configuration
377 :return: Configured preprocessor instance
378 :raises ValueError: If unknown preprocessor type specified
379 """
380 if config is None: 380 ↛ 382line 380 didn't jump to line 382 because the condition on line 380 was never true
381 # Default to text chunking if no config provided
382 return TextChunkingPreprocessor()
384 if config.type == PreprocessorType.TEXT_CHUNKING: 384 ↛ 386line 384 didn't jump to line 386 because the condition on line 384 was always true
385 return TextChunkingPreprocessor(config.text_chunking_config)
386 elif config.type == PreprocessorType.CONTEXTUAL:
387 return ContextualPreprocessor(config.contextual_config)
388 elif config.type == PreprocessorType.JSON_CHUNKING:
389 # Import here to avoid circular imports
390 from mindsdb.interfaces.knowledge_base.preprocessing.json_chunker import JSONChunkingPreprocessor
392 return JSONChunkingPreprocessor(config.json_chunking_config)
393 else:
394 raise ValueError(f"Unknown preprocessor type: {config.type}")