Coverage for mindsdb / interfaces / knowledge_base / preprocessing / json_chunker.py: 72%
209 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import ast
2import json
3from typing import List, Dict, Any, Optional
5import pandas as pd
7from mindsdb.interfaces.knowledge_base.preprocessing.models import Document, ProcessedChunk, JSONChunkingConfig
8from mindsdb.interfaces.knowledge_base.preprocessing.document_preprocessor import DocumentPreprocessor
9from mindsdb.utilities import log
11# Set up logger
12logger = log.getLogger(__name__)
15class JSONChunkingPreprocessor(DocumentPreprocessor):
16 """JSON chunking preprocessor for handling JSON data structures"""
18 def __init__(self, config: Optional[JSONChunkingConfig] = None):
19 """Initialize with JSON chunking configuration"""
20 super().__init__()
21 self.config = config or JSONChunkingConfig()
22 # No need for a text splitter here as we'll chunk by JSON structure
24 def process_documents(self, documents: List[Document]) -> List[ProcessedChunk]:
25 """Process JSON documents into chunks
27 Args:
28 documents: List of documents containing JSON content
30 Returns:
31 List of processed chunks
32 """
33 all_chunks = []
35 for doc in documents:
36 try:
37 # Parse document content into a Python object
38 json_data = self._parse_document_content(doc)
39 if json_data is None:
40 # Handle parsing failure
41 error_message = "Content is neither valid JSON nor a valid Python literal."
42 error_chunk = self._create_error_chunk(doc, error_message)
43 all_chunks.append(error_chunk)
44 continue # Skip to next document
46 # Process the JSON data based on its structure
47 chunks = self._process_json_data(json_data, doc)
48 all_chunks.extend(chunks)
49 except Exception as e:
50 logger.exception(f"Error processing document {doc.id}:")
51 error_chunk = self._create_error_chunk(doc, str(e))
52 all_chunks.append(error_chunk)
54 return all_chunks
56 def _parse_document_content(self, doc: Document) -> Optional[Any]:
57 """Parse document content into a Python object
59 Args:
60 doc: Document with content to parse
62 Returns:
63 Parsed content as a Python object or None if parsing failed
64 """
65 # If content is not a string, assume it's already a Python object
66 if not isinstance(doc.content, str): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 return doc.content
69 # Try to parse as JSON first
70 try:
71 return json.loads(doc.content)
72 except json.JSONDecodeError:
73 # If JSON parsing fails, try as Python literal
74 try:
75 return ast.literal_eval(doc.content)
76 except (SyntaxError, ValueError):
77 logger.exception(f"Error parsing content for document {doc.id}:")
78 # We'll create the error chunk in the main process_documents method
79 return None
81 def _process_json_data(self, json_data: Any, doc: Document) -> List[ProcessedChunk]:
82 """Process JSON data based on its structure
84 Args:
85 json_data: Parsed JSON data as a Python object
86 doc: Original document
88 Returns:
89 List of processed chunks
90 """
91 if isinstance(json_data, list): 91 ↛ 93line 91 didn't jump to line 93 because the condition on line 91 was never true
92 # List of objects - chunk by object
93 return self._process_json_list(json_data, doc)
94 elif isinstance(json_data, dict): 94 ↛ 102line 94 didn't jump to line 102 because the condition on line 94 was always true
95 # Single object - chunk according to config
96 if self.config.chunk_by_object:
97 return [self._create_chunk_from_dict(json_data, doc, 0, 1)]
98 else:
99 return self._process_json_dict(json_data, doc)
100 else:
101 # Primitive value - create a single chunk
102 return [self._create_chunk_from_primitive(json_data, doc)]
104 def _create_error_chunk(self, doc: Document, error_message: str) -> ProcessedChunk:
105 """Create a chunk containing error information
107 Args:
108 doc: Original document
109 error_message: Error message to include in the chunk
111 Returns:
112 ProcessedChunk with error information
113 """
114 return ProcessedChunk(
115 id=f"{doc.id}_error",
116 content=f"Error processing document: {error_message}",
117 metadata=self._prepare_chunk_metadata(doc.id, 0, doc.metadata),
118 )
120 def _process_json_list(self, json_list: List, doc: Document) -> List[ProcessedChunk]:
121 """Process a list of JSON objects into chunks"""
122 chunks = []
123 total_objects = len(json_list)
125 for i, item in enumerate(json_list):
126 if isinstance(item, dict):
127 chunk = self._create_chunk_from_dict(item, doc, i, total_objects)
128 chunks.append(chunk)
129 elif isinstance(item, list):
130 # Handle nested lists by converting to string representation
131 chunk = self._create_chunk_from_primitive(
132 json.dumps(item), doc, chunk_index=i, total_chunks=total_objects
133 )
134 chunks.append(chunk)
135 else:
136 # Handle primitive values
137 chunk = self._create_chunk_from_primitive(item, doc, chunk_index=i, total_chunks=total_objects)
138 chunks.append(chunk)
140 return chunks
142 def _process_json_dict(self, json_dict: Dict, doc: Document) -> List[ProcessedChunk]:
143 """Process a single JSON object into chunks by fields"""
144 chunks = []
146 # Ensure we're working with a dictionary
147 if isinstance(json_dict, str): 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 try:
149 json_dict = json.loads(json_dict)
150 except json.JSONDecodeError:
151 logger.exception(f"Error parsing JSON string: {json_dict[:100]}...")
152 return [self._create_error_chunk(doc, "Invalid JSON string")]
154 # Filter fields based on include/exclude lists
155 fields_to_process = {}
156 for key, value in json_dict.items():
157 if self.config.include_fields and key not in self.config.include_fields: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true
158 continue
159 if key in self.config.exclude_fields: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 continue
161 fields_to_process[key] = value
163 # Create a chunk for each field
164 total_fields = len(fields_to_process)
165 for i, (key, value) in enumerate(fields_to_process.items()):
166 field_content = self._format_field_content(key, value)
168 # Create chunk metadata
169 metadata = self._prepare_chunk_metadata(doc.id, i, doc.metadata)
170 metadata["field_name"] = key
172 # Extract fields to metadata for filtering
173 self._extract_fields_to_metadata(json_dict, metadata)
175 # Generate chunk ID
176 chunk_id = self._generate_chunk_id(
177 chunk_index=i,
178 total_chunks=total_fields,
179 start_char=0,
180 end_char=len(field_content),
181 provided_id=doc.id,
182 content_column=self.config.content_column,
183 )
185 # Create and add the chunk
186 chunk = ProcessedChunk(id=chunk_id, content=field_content, metadata=metadata)
187 chunks.append(chunk)
189 return chunks
191 def _create_chunk_from_dict(
192 self, json_dict: Dict, doc: Document, chunk_index: int, total_chunks: int
193 ) -> ProcessedChunk:
194 """Create a chunk from a JSON dictionary"""
195 # Ensure we're working with a dictionary
196 if isinstance(json_dict, str): 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 try:
198 json_dict = json.loads(json_dict)
199 except json.JSONDecodeError:
200 logger.exception(f"Error parsing JSON string: {json_dict[:100]}...")
201 return self._create_error_chunk(doc, "Invalid JSON string")
203 # Format the content
204 if self.config.flatten_nested:
205 flattened = self._flatten_dict(json_dict, self.config.nested_delimiter)
206 filtered_dict = self._filter_fields(flattened)
207 content = self._dict_to_text(filtered_dict)
208 else:
209 filtered_dict = {
210 k: v
211 for k, v in json_dict.items()
212 if (not self.config.include_fields or k in self.config.include_fields)
213 and k not in self.config.exclude_fields
214 }
215 content = json.dumps(filtered_dict, indent=2)
217 # Create metadata
218 metadata = self._prepare_chunk_metadata(doc.id, chunk_index, doc.metadata)
220 # Extract fields to metadata for filtering
221 self._extract_fields_to_metadata(json_dict, metadata)
223 # Generate chunk ID
224 chunk_id = self._generate_chunk_id(
225 chunk_index=chunk_index,
226 total_chunks=total_chunks,
227 start_char=0,
228 end_char=len(content),
229 provided_id=doc.id,
230 content_column=self.config.content_column,
231 )
233 return ProcessedChunk(id=chunk_id, content=content, metadata=metadata)
235 def _filter_fields(self, flattened_dict: Dict) -> Dict:
236 """Filter fields based on include/exclude configuration"""
237 # If include_fields is specified, only keep those fields
238 if self.config.include_fields:
239 filtered_dict = {
240 k: v
241 for k, v in flattened_dict.items()
242 if any(
243 k == field or k.startswith(field + self.config.nested_delimiter)
244 for field in self.config.include_fields
245 )
246 }
247 else:
248 filtered_dict = flattened_dict.copy()
250 # Apply exclude_fields
251 if self.config.exclude_fields:
252 for exclude_field in self.config.exclude_fields:
253 # Remove exact field match
254 if exclude_field in filtered_dict:
255 filtered_dict.pop(exclude_field)
257 # Remove any nested fields
258 nested_prefix = exclude_field + self.config.nested_delimiter
259 keys_to_remove = [k for k in filtered_dict if k.startswith(nested_prefix)]
260 for key in keys_to_remove:
261 filtered_dict.pop(key)
263 return filtered_dict
265 def _create_chunk_from_primitive(
266 self, value: Any, doc: Document, chunk_index: int = 0, total_chunks: int = 1
267 ) -> ProcessedChunk:
268 """Create a chunk from a primitive value"""
269 content = str(value)
271 # Create metadata
272 metadata = self._prepare_chunk_metadata(doc.id, chunk_index, doc.metadata)
274 # For primitive values, we don't have a JSON dictionary to extract fields from
275 # But we can add the value itself as a metadata field if configured
276 if self.config.extract_all_primitives:
277 metadata["field_value"] = value
279 # Generate chunk ID
280 chunk_id = self._generate_chunk_id(
281 chunk_index=chunk_index,
282 total_chunks=total_chunks,
283 start_char=0,
284 end_char=len(content),
285 provided_id=doc.id,
286 content_column=self.config.content_column,
287 )
289 return ProcessedChunk(id=chunk_id, content=content, metadata=metadata)
291 def _flatten_dict(self, d: Dict, delimiter: str = ".", prefix: str = "") -> Dict:
292 """Flatten a nested dictionary structure"""
293 result = {}
294 for k, v in d.items():
295 new_key = f"{prefix}{delimiter}{k}" if prefix else k
296 if isinstance(v, dict):
297 result.update(self._flatten_dict(v, delimiter, new_key))
298 elif isinstance(v, list) and all(isinstance(item, dict) for item in v):
299 # Handle lists of dictionaries
300 for i, item in enumerate(v):
301 result.update(self._flatten_dict(item, delimiter, f"{new_key}[{i}]"))
302 else:
303 result[new_key] = v
304 return result
306 def _dict_to_text(self, d: Dict) -> str:
307 """Convert a dictionary to a human-readable text format"""
308 lines = []
309 for key, value in d.items():
310 if value is None:
311 continue
312 if isinstance(value, list):
313 if not value: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 continue
315 if all(isinstance(item, dict) for item in value): 315 ↛ 317line 315 didn't jump to line 317 because the condition on line 315 was never true
316 # Format list of dictionaries
317 lines.append(f"{key}:")
318 for i, item in enumerate(value):
319 lines.append(f" Item {i + 1}:")
320 for k, v in item.items():
321 lines.append(f" {k}: {v}")
322 else:
323 # Format list of primitives
324 value_str = ", ".join(str(item) for item in value)
325 lines.append(f"{key}: {value_str}")
326 else:
327 lines.append(f"{key}: {value}")
329 return "\n".join(lines)
331 def _format_field_content(self, key: str, value: Any) -> str:
332 """Format a field's content for inclusion in a chunk"""
333 if isinstance(value, dict):
334 if self.config.flatten_nested: 334 ↛ 338line 334 didn't jump to line 338 because the condition on line 334 was always true
335 flattened = self._flatten_dict(value, self.config.nested_delimiter, key)
336 return self._dict_to_text(flattened)
337 else:
338 return f"{key}: {json.dumps(value, indent=2)}"
339 elif isinstance(value, list):
340 if all(isinstance(item, dict) for item in value):
341 # Format list of dictionaries
342 lines = [f"{key}:"]
343 for i, item in enumerate(value):
344 lines.append(f" Item {i + 1}:")
345 for k, v in item.items():
346 lines.append(f" {k}: {v}")
347 return "\n".join(lines)
348 else:
349 # Format list of primitives
350 value_str = ", ".join(str(item) for item in value if item is not None)
351 return f"{key}: {value_str}"
352 else:
353 return f"{key}: {value}"
355 def _extract_fields_to_metadata(self, json_dict: Dict, metadata: Dict) -> None:
356 """Extract specified fields from JSON to metadata for filtering"""
357 # Ensure we're working with a dictionary
358 if isinstance(json_dict, str): 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 try:
360 json_dict = json.loads(json_dict)
361 except json.JSONDecodeError:
362 logger.exception(f"Error parsing JSON string: {json_dict[:100]}...")
363 return
365 # Always flatten the dictionary for metadata extraction
366 flattened = self._flatten_dict(json_dict, self.config.nested_delimiter)
368 # If extract_all_primitives is True, extract all primitive values
369 if self.config.extract_all_primitives:
370 for key, value in flattened.items():
371 if isinstance(value, (str, int, float, bool)) and value is not None: 371 ↛ 370line 371 didn't jump to line 370 because the condition on line 371 was always true
372 metadata[f"field_{key}"] = value
373 return
375 # If metadata_fields is empty and extract_all_primitives is False,
376 # assume all fields should be extracted
377 if not self.config.metadata_fields:
378 # First try to extract top-level primitive fields
379 has_primitives = False
380 for key, value in json_dict.items():
381 if isinstance(value, (str, int, float, bool)) and value is not None:
382 metadata[f"field_{key}"] = value
383 has_primitives = True
385 # If no top-level primitives were found, extract all primitives from flattened dict
386 if not has_primitives:
387 for key, value in flattened.items():
388 if isinstance(value, (str, int, float, bool)) and value is not None:
389 metadata[f"field_{key}"] = value
390 else:
391 # Extract only the specified fields
392 for field in self.config.metadata_fields:
393 if field in flattened and flattened[field] is not None: 393 ↛ 397line 393 didn't jump to line 397 because the condition on line 393 was always true
394 metadata[f"field_{field}"] = flattened[field]
395 else:
396 # Try to navigate the nested structure manually
397 parts = field.split(self.config.nested_delimiter)
398 current = json_dict
399 found = True
401 for part in parts:
402 if isinstance(current, dict) and part in current:
403 current = current[part]
404 else:
405 found = False
406 break
408 if found and current is not None:
409 metadata[f"field_{field}"] = current
411 def to_dataframe(self, chunks: List[ProcessedChunk]) -> pd.DataFrame:
412 """Convert processed chunks to dataframe format"""
413 return pd.DataFrame([chunk.model_dump() for chunk in chunks])