Coverage for mindsdb / interfaces / knowledge_base / preprocessing / json_chunker.py: 72%

209 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import ast 

2import json 

3from typing import List, Dict, Any, Optional 

4 

5import pandas as pd 

6 

7from mindsdb.interfaces.knowledge_base.preprocessing.models import Document, ProcessedChunk, JSONChunkingConfig 

8from mindsdb.interfaces.knowledge_base.preprocessing.document_preprocessor import DocumentPreprocessor 

9from mindsdb.utilities import log 

10 

11# Set up logger 

12logger = log.getLogger(__name__) 

13 

14 

15class JSONChunkingPreprocessor(DocumentPreprocessor): 

16 """JSON chunking preprocessor for handling JSON data structures""" 

17 

18 def __init__(self, config: Optional[JSONChunkingConfig] = None): 

19 """Initialize with JSON chunking configuration""" 

20 super().__init__() 

21 self.config = config or JSONChunkingConfig() 

22 # No need for a text splitter here as we'll chunk by JSON structure 

23 

24 def process_documents(self, documents: List[Document]) -> List[ProcessedChunk]: 

25 """Process JSON documents into chunks 

26 

27 Args: 

28 documents: List of documents containing JSON content 

29 

30 Returns: 

31 List of processed chunks 

32 """ 

33 all_chunks = [] 

34 

35 for doc in documents: 

36 try: 

37 # Parse document content into a Python object 

38 json_data = self._parse_document_content(doc) 

39 if json_data is None: 

40 # Handle parsing failure 

41 error_message = "Content is neither valid JSON nor a valid Python literal." 

42 error_chunk = self._create_error_chunk(doc, error_message) 

43 all_chunks.append(error_chunk) 

44 continue # Skip to next document 

45 

46 # Process the JSON data based on its structure 

47 chunks = self._process_json_data(json_data, doc) 

48 all_chunks.extend(chunks) 

49 except Exception as e: 

50 logger.exception(f"Error processing document {doc.id}:") 

51 error_chunk = self._create_error_chunk(doc, str(e)) 

52 all_chunks.append(error_chunk) 

53 

54 return all_chunks 

55 

56 def _parse_document_content(self, doc: Document) -> Optional[Any]: 

57 """Parse document content into a Python object 

58 

59 Args: 

60 doc: Document with content to parse 

61 

62 Returns: 

63 Parsed content as a Python object or None if parsing failed 

64 """ 

65 # If content is not a string, assume it's already a Python object 

66 if not isinstance(doc.content, str): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 return doc.content 

68 

69 # Try to parse as JSON first 

70 try: 

71 return json.loads(doc.content) 

72 except json.JSONDecodeError: 

73 # If JSON parsing fails, try as Python literal 

74 try: 

75 return ast.literal_eval(doc.content) 

76 except (SyntaxError, ValueError): 

77 logger.exception(f"Error parsing content for document {doc.id}:") 

78 # We'll create the error chunk in the main process_documents method 

79 return None 

80 

81 def _process_json_data(self, json_data: Any, doc: Document) -> List[ProcessedChunk]: 

82 """Process JSON data based on its structure 

83 

84 Args: 

85 json_data: Parsed JSON data as a Python object 

86 doc: Original document 

87 

88 Returns: 

89 List of processed chunks 

90 """ 

91 if isinstance(json_data, list): 91 ↛ 93line 91 didn't jump to line 93 because the condition on line 91 was never true

92 # List of objects - chunk by object 

93 return self._process_json_list(json_data, doc) 

94 elif isinstance(json_data, dict): 94 ↛ 102line 94 didn't jump to line 102 because the condition on line 94 was always true

95 # Single object - chunk according to config 

96 if self.config.chunk_by_object: 

97 return [self._create_chunk_from_dict(json_data, doc, 0, 1)] 

98 else: 

99 return self._process_json_dict(json_data, doc) 

100 else: 

101 # Primitive value - create a single chunk 

102 return [self._create_chunk_from_primitive(json_data, doc)] 

103 

104 def _create_error_chunk(self, doc: Document, error_message: str) -> ProcessedChunk: 

105 """Create a chunk containing error information 

106 

107 Args: 

108 doc: Original document 

109 error_message: Error message to include in the chunk 

110 

111 Returns: 

112 ProcessedChunk with error information 

113 """ 

114 return ProcessedChunk( 

115 id=f"{doc.id}_error", 

116 content=f"Error processing document: {error_message}", 

117 metadata=self._prepare_chunk_metadata(doc.id, 0, doc.metadata), 

118 ) 

119 

120 def _process_json_list(self, json_list: List, doc: Document) -> List[ProcessedChunk]: 

121 """Process a list of JSON objects into chunks""" 

122 chunks = [] 

123 total_objects = len(json_list) 

124 

125 for i, item in enumerate(json_list): 

126 if isinstance(item, dict): 

127 chunk = self._create_chunk_from_dict(item, doc, i, total_objects) 

128 chunks.append(chunk) 

129 elif isinstance(item, list): 

130 # Handle nested lists by converting to string representation 

131 chunk = self._create_chunk_from_primitive( 

132 json.dumps(item), doc, chunk_index=i, total_chunks=total_objects 

133 ) 

134 chunks.append(chunk) 

135 else: 

136 # Handle primitive values 

137 chunk = self._create_chunk_from_primitive(item, doc, chunk_index=i, total_chunks=total_objects) 

138 chunks.append(chunk) 

139 

140 return chunks 

141 

142 def _process_json_dict(self, json_dict: Dict, doc: Document) -> List[ProcessedChunk]: 

143 """Process a single JSON object into chunks by fields""" 

144 chunks = [] 

145 

146 # Ensure we're working with a dictionary 

147 if isinstance(json_dict, str): 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 try: 

149 json_dict = json.loads(json_dict) 

150 except json.JSONDecodeError: 

151 logger.exception(f"Error parsing JSON string: {json_dict[:100]}...") 

152 return [self._create_error_chunk(doc, "Invalid JSON string")] 

153 

154 # Filter fields based on include/exclude lists 

155 fields_to_process = {} 

156 for key, value in json_dict.items(): 

157 if self.config.include_fields and key not in self.config.include_fields: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 continue 

159 if key in self.config.exclude_fields: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 continue 

161 fields_to_process[key] = value 

162 

163 # Create a chunk for each field 

164 total_fields = len(fields_to_process) 

165 for i, (key, value) in enumerate(fields_to_process.items()): 

166 field_content = self._format_field_content(key, value) 

167 

168 # Create chunk metadata 

169 metadata = self._prepare_chunk_metadata(doc.id, i, doc.metadata) 

170 metadata["field_name"] = key 

171 

172 # Extract fields to metadata for filtering 

173 self._extract_fields_to_metadata(json_dict, metadata) 

174 

175 # Generate chunk ID 

176 chunk_id = self._generate_chunk_id( 

177 chunk_index=i, 

178 total_chunks=total_fields, 

179 start_char=0, 

180 end_char=len(field_content), 

181 provided_id=doc.id, 

182 content_column=self.config.content_column, 

183 ) 

184 

185 # Create and add the chunk 

186 chunk = ProcessedChunk(id=chunk_id, content=field_content, metadata=metadata) 

187 chunks.append(chunk) 

188 

189 return chunks 

190 

191 def _create_chunk_from_dict( 

192 self, json_dict: Dict, doc: Document, chunk_index: int, total_chunks: int 

193 ) -> ProcessedChunk: 

194 """Create a chunk from a JSON dictionary""" 

195 # Ensure we're working with a dictionary 

196 if isinstance(json_dict, str): 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 try: 

198 json_dict = json.loads(json_dict) 

199 except json.JSONDecodeError: 

200 logger.exception(f"Error parsing JSON string: {json_dict[:100]}...") 

201 return self._create_error_chunk(doc, "Invalid JSON string") 

202 

203 # Format the content 

204 if self.config.flatten_nested: 

205 flattened = self._flatten_dict(json_dict, self.config.nested_delimiter) 

206 filtered_dict = self._filter_fields(flattened) 

207 content = self._dict_to_text(filtered_dict) 

208 else: 

209 filtered_dict = { 

210 k: v 

211 for k, v in json_dict.items() 

212 if (not self.config.include_fields or k in self.config.include_fields) 

213 and k not in self.config.exclude_fields 

214 } 

215 content = json.dumps(filtered_dict, indent=2) 

216 

217 # Create metadata 

218 metadata = self._prepare_chunk_metadata(doc.id, chunk_index, doc.metadata) 

219 

220 # Extract fields to metadata for filtering 

221 self._extract_fields_to_metadata(json_dict, metadata) 

222 

223 # Generate chunk ID 

224 chunk_id = self._generate_chunk_id( 

225 chunk_index=chunk_index, 

226 total_chunks=total_chunks, 

227 start_char=0, 

228 end_char=len(content), 

229 provided_id=doc.id, 

230 content_column=self.config.content_column, 

231 ) 

232 

233 return ProcessedChunk(id=chunk_id, content=content, metadata=metadata) 

234 

235 def _filter_fields(self, flattened_dict: Dict) -> Dict: 

236 """Filter fields based on include/exclude configuration""" 

237 # If include_fields is specified, only keep those fields 

238 if self.config.include_fields: 

239 filtered_dict = { 

240 k: v 

241 for k, v in flattened_dict.items() 

242 if any( 

243 k == field or k.startswith(field + self.config.nested_delimiter) 

244 for field in self.config.include_fields 

245 ) 

246 } 

247 else: 

248 filtered_dict = flattened_dict.copy() 

249 

250 # Apply exclude_fields 

251 if self.config.exclude_fields: 

252 for exclude_field in self.config.exclude_fields: 

253 # Remove exact field match 

254 if exclude_field in filtered_dict: 

255 filtered_dict.pop(exclude_field) 

256 

257 # Remove any nested fields 

258 nested_prefix = exclude_field + self.config.nested_delimiter 

259 keys_to_remove = [k for k in filtered_dict if k.startswith(nested_prefix)] 

260 for key in keys_to_remove: 

261 filtered_dict.pop(key) 

262 

263 return filtered_dict 

264 

265 def _create_chunk_from_primitive( 

266 self, value: Any, doc: Document, chunk_index: int = 0, total_chunks: int = 1 

267 ) -> ProcessedChunk: 

268 """Create a chunk from a primitive value""" 

269 content = str(value) 

270 

271 # Create metadata 

272 metadata = self._prepare_chunk_metadata(doc.id, chunk_index, doc.metadata) 

273 

274 # For primitive values, we don't have a JSON dictionary to extract fields from 

275 # But we can add the value itself as a metadata field if configured 

276 if self.config.extract_all_primitives: 

277 metadata["field_value"] = value 

278 

279 # Generate chunk ID 

280 chunk_id = self._generate_chunk_id( 

281 chunk_index=chunk_index, 

282 total_chunks=total_chunks, 

283 start_char=0, 

284 end_char=len(content), 

285 provided_id=doc.id, 

286 content_column=self.config.content_column, 

287 ) 

288 

289 return ProcessedChunk(id=chunk_id, content=content, metadata=metadata) 

290 

291 def _flatten_dict(self, d: Dict, delimiter: str = ".", prefix: str = "") -> Dict: 

292 """Flatten a nested dictionary structure""" 

293 result = {} 

294 for k, v in d.items(): 

295 new_key = f"{prefix}{delimiter}{k}" if prefix else k 

296 if isinstance(v, dict): 

297 result.update(self._flatten_dict(v, delimiter, new_key)) 

298 elif isinstance(v, list) and all(isinstance(item, dict) for item in v): 

299 # Handle lists of dictionaries 

300 for i, item in enumerate(v): 

301 result.update(self._flatten_dict(item, delimiter, f"{new_key}[{i}]")) 

302 else: 

303 result[new_key] = v 

304 return result 

305 

306 def _dict_to_text(self, d: Dict) -> str: 

307 """Convert a dictionary to a human-readable text format""" 

308 lines = [] 

309 for key, value in d.items(): 

310 if value is None: 

311 continue 

312 if isinstance(value, list): 

313 if not value: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 continue 

315 if all(isinstance(item, dict) for item in value): 315 ↛ 317line 315 didn't jump to line 317 because the condition on line 315 was never true

316 # Format list of dictionaries 

317 lines.append(f"{key}:") 

318 for i, item in enumerate(value): 

319 lines.append(f" Item {i + 1}:") 

320 for k, v in item.items(): 

321 lines.append(f" {k}: {v}") 

322 else: 

323 # Format list of primitives 

324 value_str = ", ".join(str(item) for item in value) 

325 lines.append(f"{key}: {value_str}") 

326 else: 

327 lines.append(f"{key}: {value}") 

328 

329 return "\n".join(lines) 

330 

331 def _format_field_content(self, key: str, value: Any) -> str: 

332 """Format a field's content for inclusion in a chunk""" 

333 if isinstance(value, dict): 

334 if self.config.flatten_nested: 334 ↛ 338line 334 didn't jump to line 338 because the condition on line 334 was always true

335 flattened = self._flatten_dict(value, self.config.nested_delimiter, key) 

336 return self._dict_to_text(flattened) 

337 else: 

338 return f"{key}: {json.dumps(value, indent=2)}" 

339 elif isinstance(value, list): 

340 if all(isinstance(item, dict) for item in value): 

341 # Format list of dictionaries 

342 lines = [f"{key}:"] 

343 for i, item in enumerate(value): 

344 lines.append(f" Item {i + 1}:") 

345 for k, v in item.items(): 

346 lines.append(f" {k}: {v}") 

347 return "\n".join(lines) 

348 else: 

349 # Format list of primitives 

350 value_str = ", ".join(str(item) for item in value if item is not None) 

351 return f"{key}: {value_str}" 

352 else: 

353 return f"{key}: {value}" 

354 

355 def _extract_fields_to_metadata(self, json_dict: Dict, metadata: Dict) -> None: 

356 """Extract specified fields from JSON to metadata for filtering""" 

357 # Ensure we're working with a dictionary 

358 if isinstance(json_dict, str): 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 try: 

360 json_dict = json.loads(json_dict) 

361 except json.JSONDecodeError: 

362 logger.exception(f"Error parsing JSON string: {json_dict[:100]}...") 

363 return 

364 

365 # Always flatten the dictionary for metadata extraction 

366 flattened = self._flatten_dict(json_dict, self.config.nested_delimiter) 

367 

368 # If extract_all_primitives is True, extract all primitive values 

369 if self.config.extract_all_primitives: 

370 for key, value in flattened.items(): 

371 if isinstance(value, (str, int, float, bool)) and value is not None: 371 ↛ 370line 371 didn't jump to line 370 because the condition on line 371 was always true

372 metadata[f"field_{key}"] = value 

373 return 

374 

375 # If metadata_fields is empty and extract_all_primitives is False, 

376 # assume all fields should be extracted 

377 if not self.config.metadata_fields: 

378 # First try to extract top-level primitive fields 

379 has_primitives = False 

380 for key, value in json_dict.items(): 

381 if isinstance(value, (str, int, float, bool)) and value is not None: 

382 metadata[f"field_{key}"] = value 

383 has_primitives = True 

384 

385 # If no top-level primitives were found, extract all primitives from flattened dict 

386 if not has_primitives: 

387 for key, value in flattened.items(): 

388 if isinstance(value, (str, int, float, bool)) and value is not None: 

389 metadata[f"field_{key}"] = value 

390 else: 

391 # Extract only the specified fields 

392 for field in self.config.metadata_fields: 

393 if field in flattened and flattened[field] is not None: 393 ↛ 397line 393 didn't jump to line 397 because the condition on line 393 was always true

394 metadata[f"field_{field}"] = flattened[field] 

395 else: 

396 # Try to navigate the nested structure manually 

397 parts = field.split(self.config.nested_delimiter) 

398 current = json_dict 

399 found = True 

400 

401 for part in parts: 

402 if isinstance(current, dict) and part in current: 

403 current = current[part] 

404 else: 

405 found = False 

406 break 

407 

408 if found and current is not None: 

409 metadata[f"field_{field}"] = current 

410 

411 def to_dataframe(self, chunks: List[ProcessedChunk]) -> pd.DataFrame: 

412 """Convert processed chunks to dataframe format""" 

413 return pd.DataFrame([chunk.model_dump() for chunk in chunks])