Coverage for mindsdb / interfaces / knowledge_base / preprocessing / document_preprocessor.py: 56%

179 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import re 

2import html 

3import asyncio 

4from typing import List, Dict, Optional, Any 

5 

6import pandas as pd 

7from mindsdb.interfaces.knowledge_base.preprocessing.text_splitter import TextSplitter 

8 

9from mindsdb.integrations.utilities.rag.splitters.file_splitter import ( 

10 FileSplitter, 

11 FileSplitterConfig, 

12) 

13 

14try: 

15 from mindsdb.interfaces.agents.langchain_agent import create_chat_model 

16except ModuleNotFoundError as exc: # pragma: no cover - optional dependency 

17 if getattr(exc, "name", "") and "langchain" in exc.name: 

18 create_chat_model = None 

19 _LANGCHAIN_IMPORT_ERROR = exc 

20 else: 

21 raise 

22else: 

23 _LANGCHAIN_IMPORT_ERROR = None 

24from mindsdb.interfaces.knowledge_base.preprocessing.models import ( 

25 PreprocessingConfig, 

26 ProcessedChunk, 

27 PreprocessorType, 

28 ContextualConfig, 

29 Document, 

30 TextChunkingConfig, 

31) 

32from mindsdb.utilities import log 

33 

34logger = log.getLogger(__name__) 

35 

36_DEFAULT_CONTENT_COLUMN_NAME = "content" 

37 

38 

39def _require_agent_extra(feature: str): 

40 if create_chat_model is None: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true

41 raise ImportError( 

42 f"{feature} requires the optional agent dependencies. Install them via `pip install mindsdb[kb]`." 

43 ) from _LANGCHAIN_IMPORT_ERROR 

44 

45 

46class DocumentPreprocessor: 

47 """Base class for document preprocessing""" 

48 

49 def __init__(self): 

50 """Initialize preprocessor""" 

51 self.splitter = None # Will be set by child classes 

52 self.config = None 

53 

54 def process_documents(self, documents: List[Document]) -> List[ProcessedChunk]: 

55 """Base implementation - should be overridden by child classes 

56 

57 Args: 

58 documents: List of documents to process 

59 """ 

60 raise NotImplementedError("Subclasses must implement process_documents") 

61 

62 def _split_document(self, doc: Document) -> List[Document]: 

63 """Split document into chunks while preserving metadata""" 

64 if self.splitter is None: 

65 raise ValueError("Splitter not configured") 

66 

67 metadata = doc.metadata or {} 

68 # Split and convert back to our Document type 

69 split_texts = self.splitter.split_text(doc.content) 

70 return [Document(content=text, metadata=metadata) for text in split_texts] 

71 

72 def _get_source(self) -> str: 

73 """Get the source identifier for this preprocessor""" 

74 return self.__class__.__name__ 

75 

76 def to_dataframe(self, chunks: List[ProcessedChunk]) -> pd.DataFrame: 

77 """Convert processed chunks to dataframe format""" 

78 return pd.DataFrame([chunk.model_dump() for chunk in chunks]) 

79 

80 @staticmethod 

81 def dict_to_document(data: Dict[str, Any]) -> Document: 

82 """Convert a dictionary to a Document object""" 

83 return Document( 

84 id=data.get("id"), 

85 content=data.get("content", ""), 

86 embeddings=data.get("embeddings"), 

87 metadata=data.get("metadata", {}), 

88 ) 

89 

90 def _generate_chunk_id( 

91 self, 

92 chunk_index: Optional[int] = None, 

93 total_chunks: Optional[int] = None, 

94 start_char: Optional[int] = None, 

95 end_char: Optional[int] = None, 

96 provided_id: str = None, 

97 content_column: str = None, 

98 ) -> str: 

99 """Generate human-readable deterministic ID for a chunk 

100 Format: <doc_id>:<content_column>:<chunk_number>of<total_chunks>:<start_char>to<end_char> 

101 """ 

102 if provided_id is None: 

103 raise ValueError("Document ID must be provided for chunk ID generation") 

104 

105 if content_column is None: 

106 raise ValueError("Content column must be provided for chunk ID generation") 

107 

108 chunk_id = f"{provided_id}:{content_column}:{chunk_index + 1}of{total_chunks}:{start_char}to{end_char}" 

109 logger.debug(f"Generated chunk ID: {chunk_id}") 

110 return chunk_id 

111 

112 def _prepare_chunk_metadata( 

113 self, 

114 doc_id: Optional[str], 

115 chunk_index: Optional[int], 

116 base_metadata: Optional[Dict] = None, 

117 ) -> Dict: 

118 """Centralized method for preparing chunk metadata""" 

119 metadata = base_metadata or {} 

120 

121 # Always preserve original document ID 

122 if doc_id is not None: 122 ↛ 126line 122 didn't jump to line 126 because the condition on line 122 was always true

123 metadata[self.config.doc_id_column_name] = doc_id 

124 

125 # Add chunk index only for multi-chunk cases 

126 if chunk_index is not None: 126 ↛ 130line 126 didn't jump to line 130 because the condition on line 126 was always true

127 metadata["_chunk_index"] = chunk_index 

128 

129 # Always set source 

130 metadata["_source"] = self._get_source() 

131 

132 return metadata 

133 

134 

135class ContextualPreprocessor(DocumentPreprocessor): 

136 """Contextual preprocessing implementation that enhances document chunks with context""" 

137 

138 DEFAULT_CONTEXT_TEMPLATE = """ 

139<document> 

140{WHOLE_DOCUMENT} 

141</document> 

142Here is the chunk we want to situate within the whole document 

143<chunk> 

144{CHUNK_CONTENT} 

145</chunk> 

146Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk. Answer only with the succinct context and nothing else.""" 

147 

148 def __init__(self, config: ContextualConfig): 

149 """Initialize with contextual configuration""" 

150 super().__init__() 

151 self.config = config 

152 self.splitter = FileSplitter( 

153 FileSplitterConfig(chunk_size=config.chunk_size, chunk_overlap=config.chunk_overlap) 

154 ) 

155 _require_agent_extra("Contextual preprocessing") 

156 self.llm = create_chat_model( 

157 { 

158 "model_name": self.config.llm_config.model_name, 

159 "provider": self.config.llm_config.provider, 

160 **self.config.llm_config.params, 

161 } 

162 ) 

163 self.context_template = config.context_template or self.DEFAULT_CONTEXT_TEMPLATE 

164 self.summarize = self.config.summarize 

165 

166 def _prepare_prompts(self, chunk_contents: list[str], full_documents: list[str]) -> list[str]: 

167 def tag_replacer(match): 

168 tag = match.group(0) 

169 if tag.lower() not in ["<document>", "</document>", "<chunk>", "</chunk>"]: 

170 return tag 

171 return html.escape(tag) 

172 

173 tag_pattern = r"</?document>|</?chunk>" 

174 prompts = [] 

175 for chunk_content, full_document in zip(chunk_contents, full_documents): 

176 chunk_content = re.sub(tag_pattern, tag_replacer, chunk_content, flags=re.IGNORECASE) 

177 full_document = re.sub(tag_pattern, tag_replacer, full_document, flags=re.IGNORECASE) 

178 prompts.append( 

179 self.DEFAULT_CONTEXT_TEMPLATE.format(WHOLE_DOCUMENT=full_document, CHUNK_CONTENT=chunk_content) 

180 ) 

181 

182 return prompts 

183 

184 def _generate_context(self, chunk_contents: list[str], full_documents: list[str]) -> list[str]: 

185 """Generate contextual description for a chunk using LLM""" 

186 prompts = self._prepare_prompts(chunk_contents, full_documents) 

187 

188 # Check if LLM supports async 

189 if hasattr(self.llm, "abatch"): 

190 loop = asyncio.new_event_loop() 

191 asyncio.set_event_loop(loop) 

192 try: 

193 response = loop.run_until_complete(self._async_generate(prompts)) 

194 finally: 

195 loop.close() 

196 else: 

197 # Use sync batch for non-async LLMs 

198 response = [resp.content for resp in self.llm.batch(prompts)] 

199 return response 

200 

201 async def _async_generate(self, prompts: list[str]) -> list[str]: 

202 """Helper for async LLM generation""" 

203 return [resp.content for resp in await self.llm.abatch(prompts)] 

204 

205 def _split_document(self, doc: Document) -> List[Document]: 

206 """Split document into chunks while preserving metadata""" 

207 # Use base class implementation 

208 return super()._split_document(doc) 

209 

210 def process_documents(self, documents: List[Document]) -> List[ProcessedChunk]: 

211 chunks_list = [] 

212 doc_index_list = [] 

213 chunk_index_list = [] 

214 processed_chunks = [] 

215 

216 for doc_index, doc in enumerate(documents): 

217 # Document ID must be provided by this point 

218 if doc.id is None: 

219 raise ValueError("Document ID must be provided before preprocessing") 

220 

221 # Skip empty or whitespace-only content 

222 if not doc.content or not doc.content.strip(): 

223 continue 

224 

225 chunk_docs = self._split_document(doc) 

226 

227 # Single chunk case 

228 if len(chunk_docs) == 1: 

229 chunk_doc = chunk_docs[0] 

230 if not chunk_doc.content or not chunk_doc.content.strip(): 

231 continue 

232 else: 

233 chunks_list.append(chunk_doc) 

234 doc_index_list.append(doc_index) 

235 chunk_index_list.append(0) 

236 

237 else: 

238 # Multiple chunks case 

239 for i, chunk_doc in enumerate(chunk_docs): 

240 if not chunk_doc.content or not chunk_doc.content.strip(): 

241 continue 

242 else: 

243 chunks_list.append(chunk_doc) 

244 doc_index_list.append(doc_index) 

245 chunk_index_list.append(i) 

246 

247 # Generate contexts 

248 doc_contents = [documents[i].content for i in doc_index_list] 

249 chunk_contents = [chunk_doc.content for chunk_doc in chunks_list] 

250 contexts = self._generate_context(chunk_contents, doc_contents) 

251 

252 for context, chunk_doc, chunk_index, doc_index in zip(contexts, chunks_list, chunk_index_list, doc_index_list): 

253 processed_content = context if self.summarize else f"{context}\n\n{chunk_doc.content}" 

254 doc = documents[doc_index] 

255 

256 # Initialize metadata 

257 metadata = {} 

258 if doc.metadata: 

259 metadata.update(doc.metadata) 

260 

261 # Get content_column from metadata or use default 

262 content_column = metadata.get("_content_column") 

263 if content_column is None: 

264 # If content_column is not in metadata, use the default column name 

265 content_column = _DEFAULT_CONTENT_COLUMN_NAME 

266 logger.debug(f"No content_column found in metadata, using default: {_DEFAULT_CONTENT_COLUMN_NAME}") 

267 

268 chunk_id = self._generate_chunk_id( 

269 chunk_index=chunk_index, provided_id=doc.id, content_column=content_column 

270 ) 

271 processed_chunks.append( 

272 ProcessedChunk( 

273 id=chunk_id, 

274 content=processed_content, # Use the content with context 

275 embeddings=doc.embeddings, 

276 metadata=self._prepare_chunk_metadata(doc.id, None, metadata), 

277 ) 

278 ) 

279 

280 return processed_chunks 

281 

282 

283class TextChunkingPreprocessor(DocumentPreprocessor): 

284 """Default text chunking preprocessor using TextSplitter""" 

285 

286 def __init__(self, config: Optional[TextChunkingConfig] = None): 

287 """Initialize with text chunking configuration""" 

288 super().__init__() 

289 self.config = config or TextChunkingConfig() 

290 self.splitter = TextSplitter( 

291 chunk_size=self.config.chunk_size, 

292 chunk_overlap=self.config.chunk_overlap, 

293 separators=self.config.separators, 

294 ) 

295 

296 def _split_document(self, doc: Document) -> List[Document]: 

297 """Split document into chunks while preserving metadata""" 

298 # Use base class implementation 

299 return super()._split_document(doc) 

300 

301 def process_documents(self, documents: List[Document]) -> List[ProcessedChunk]: 

302 processed_chunks = [] 

303 

304 for doc in documents: 

305 # Document ID must be provided by this point 

306 if doc.id is None: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 raise ValueError("Document ID must be provided before preprocessing") 

308 

309 # Skip empty or whitespace-only content 

310 if not doc.content or not doc.content.strip(): 

311 continue 

312 

313 chunk_docs = self._split_document(doc) 

314 total_chunks = len(chunk_docs) 

315 

316 # Track character positions 

317 current_pos = 0 

318 for i, chunk_doc in enumerate(chunk_docs): 

319 if not chunk_doc.content or not chunk_doc.content.strip(): 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 continue 

321 

322 # Calculate chunk positions 

323 start_char = current_pos 

324 end_char = start_char + len(chunk_doc.content) 

325 current_pos = end_char + 1 # +1 for separator 

326 

327 # Initialize metadata 

328 metadata = {} 

329 if doc.metadata: 329 ↛ 333line 329 didn't jump to line 333 because the condition on line 329 was always true

330 metadata.update(doc.metadata) 

331 

332 # Add position metadata 

333 metadata["_start_char"] = start_char 

334 metadata["_end_char"] = end_char 

335 

336 # Get content_column from metadata or use default 

337 content_column = None 

338 if doc.metadata: 338 ↛ 341line 338 didn't jump to line 341 because the condition on line 338 was always true

339 content_column = doc.metadata.get("_content_column") 

340 

341 if content_column is None: 341 ↛ 346line 341 didn't jump to line 346 because the condition on line 341 was always true

342 # If content_column is not in metadata, use the default column name 

343 content_column = _DEFAULT_CONTENT_COLUMN_NAME 

344 logger.debug(f"No content_column found in metadata, using default: {_DEFAULT_CONTENT_COLUMN_NAME}") 

345 

346 chunk_id = self._generate_chunk_id( 

347 chunk_index=i, 

348 total_chunks=total_chunks, 

349 start_char=start_char, 

350 end_char=end_char, 

351 provided_id=doc.id, 

352 content_column=content_column, 

353 ) 

354 

355 processed_chunks.append( 

356 ProcessedChunk( 

357 id=chunk_id, 

358 content=chunk_doc.content, 

359 embeddings=doc.embeddings, 

360 metadata=self._prepare_chunk_metadata(doc.id, i, metadata), 

361 ) 

362 ) 

363 

364 return processed_chunks 

365 

366 

367class PreprocessorFactory: 

368 """Factory for creating preprocessors based on configuration""" 

369 

370 @staticmethod 

371 def create_preprocessor( 

372 config: Optional[PreprocessingConfig] = None, 

373 ) -> DocumentPreprocessor: 

374 """ 

375 Create appropriate preprocessor based on configuration 

376 :param config: Preprocessing configuration 

377 :return: Configured preprocessor instance 

378 :raises ValueError: If unknown preprocessor type specified 

379 """ 

380 if config is None: 380 ↛ 382line 380 didn't jump to line 382 because the condition on line 380 was never true

381 # Default to text chunking if no config provided 

382 return TextChunkingPreprocessor() 

383 

384 if config.type == PreprocessorType.TEXT_CHUNKING: 384 ↛ 386line 384 didn't jump to line 386 because the condition on line 384 was always true

385 return TextChunkingPreprocessor(config.text_chunking_config) 

386 elif config.type == PreprocessorType.CONTEXTUAL: 

387 return ContextualPreprocessor(config.contextual_config) 

388 elif config.type == PreprocessorType.JSON_CHUNKING: 

389 # Import here to avoid circular imports 

390 from mindsdb.interfaces.knowledge_base.preprocessing.json_chunker import JSONChunkingPreprocessor 

391 

392 return JSONChunkingPreprocessor(config.json_chunking_config) 

393 else: 

394 raise ValueError(f"Unknown preprocessor type: {config.type}")