Coverage for mindsdb / integrations / utilities / files / file_reader.py: 73%
242 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import csv
2import json
3import codecs
4from io import BytesIO, StringIO, IOBase
5from typing import List, Generator
6from pathlib import Path
7from dataclasses import dataclass, astuple
9import filetype
10import pandas as pd
11from charset_normalizer import from_bytes
13from mindsdb.interfaces.knowledge_base.preprocessing.text_splitter import TextSplitter
14from mindsdb.utilities import log
16logger = log.getLogger(__name__)
18DEFAULT_CHUNK_SIZE = 500
19DEFAULT_CHUNK_OVERLAP = 250
22class FileProcessingError(Exception): ...
25@dataclass(frozen=True, slots=True)
26class _SINGLE_PAGE_FORMAT:
27 CSV: str = "csv"
28 JSON: str = "json"
29 TXT: str = "txt"
30 PDF: str = "pdf"
31 PARQUET: str = "parquet"
34SINGLE_PAGE_FORMAT = _SINGLE_PAGE_FORMAT()
37@dataclass(frozen=True, slots=True)
38class _MULTI_PAGE_FORMAT:
39 XLSX: str = "xlsx"
42MULTI_PAGE_FORMAT = _MULTI_PAGE_FORMAT()
45def decode(file_obj: IOBase) -> StringIO:
46 file_obj.seek(0)
47 byte_str = file_obj.read()
48 # Move it to StringIO
49 try:
50 # Handle Microsoft's BOM "special" UTF-8 encoding
51 if byte_str.startswith(codecs.BOM_UTF8): 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 data_str = StringIO(byte_str.decode("utf-8-sig"))
53 else:
54 file_encoding_meta = from_bytes(
55 byte_str[: 32 * 1024],
56 steps=32, # Number of steps/block to extract from my_byte_str
57 chunk_size=1024, # Set block size of each extraction)
58 explain=False,
59 )
60 best_meta = file_encoding_meta.best()
61 errors = "strict"
62 if best_meta is not None: 62 ↛ 73line 62 didn't jump to line 73 because the condition on line 62 was always true
63 encoding = file_encoding_meta.best().encoding
65 try:
66 data_str = StringIO(byte_str.decode(encoding, errors))
67 except UnicodeDecodeError:
68 encoding = "utf-8"
69 errors = "replace"
71 data_str = StringIO(byte_str.decode(encoding, errors))
72 else:
73 encoding = "utf-8"
74 errors = "replace"
76 data_str = StringIO(byte_str.decode(encoding, errors))
77 except Exception as e:
78 logger.exception("Error during file decode:")
79 raise FileProcessingError("Could not load into string") from e
81 return data_str
84class FormatDetector:
85 supported_formats = astuple(SINGLE_PAGE_FORMAT) + astuple(MULTI_PAGE_FORMAT)
86 multipage_formats = astuple(MULTI_PAGE_FORMAT)
88 def __init__(
89 self,
90 path: str | None = None,
91 name: str | None = None,
92 file: IOBase | None = None,
93 ):
94 """
95 File format detector
96 One of these arguments has to be passed: `path` or `file`
98 :param path: path to the file
99 :param name: name of the file
100 :param file: file descriptor (via open(...), of BytesIO(...))
101 """
102 if path is not None:
103 file = open(path, "rb")
105 elif file is not None: 105 ↛ 112line 105 didn't jump to line 112 because the condition on line 105 was always true
106 if name is None:
107 if hasattr(file, "name"): 107 ↛ 110line 107 didn't jump to line 110 because the condition on line 107 was always true
108 path = file.name
109 else:
110 path = "file"
111 else:
112 raise FileProcessingError("Wrong arguments: path or file is required")
114 if name is None:
115 name = Path(path).name
117 self.name = name
118 self.file_obj = file
119 self.format = None
121 self.parameters = {}
123 def get_format(self) -> str:
124 if self.format is not None:
125 return self.format
127 format = self.get_format_by_name()
128 if format is not None: 128 ↛ 132line 128 didn't jump to line 132 because the condition on line 128 was always true
129 if format not in self.supported_formats: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 raise FileProcessingError(f"Not supported format: {format}")
132 if format is None and self.file_obj is not None: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 format = self.get_format_by_content()
134 self.file_obj.seek(0)
136 if format is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 raise FileProcessingError(f"Unable to detect format: {self.name}")
139 self.format = format
140 return format
142 def get_format_by_name(self):
143 extension = Path(self.name).suffix.strip(".").lower()
144 if extension == "tsv":
145 extension = "csv"
146 self.parameters["delimiter"] = "\t"
148 return extension or None
150 def get_format_by_content(self):
151 if self.is_parquet(self.file_obj):
152 return SINGLE_PAGE_FORMAT.PARQUET
154 file_type = filetype.guess(self.file_obj)
155 if file_type is not None:
156 if file_type.mime in {
157 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
158 "application/vnd.ms-excel",
159 }:
160 return MULTI_PAGE_FORMAT.XLSX
162 if file_type.mime == "application/pdf":
163 return SINGLE_PAGE_FORMAT.PDF
165 file_obj = decode(self.file_obj)
167 if self.is_json(file_obj):
168 return SINGLE_PAGE_FORMAT.JSON
170 if self.is_csv(file_obj):
171 return SINGLE_PAGE_FORMAT.CSV
173 @staticmethod
174 def is_json(data_obj: StringIO) -> bool:
175 # see if its JSON
176 data_obj.seek(0)
177 text = data_obj.read(100).strip()
178 data_obj.seek(0)
179 if len(text) > 0: 179 ↛ 187line 179 didn't jump to line 187 because the condition on line 179 was always true
180 # it looks like a json, then try to parse it
181 if text.startswith("{") or text.startswith("["):
182 try:
183 json.loads(data_obj.read())
184 return True
185 except Exception:
186 return False
187 return False
189 @classmethod
190 def is_csv(cls, data_obj: StringIO) -> bool:
191 data_obj.seek(0)
192 sample = data_obj.readline() # trying to get dialect from header
193 try:
194 data_obj.seek(0)
195 csv.Sniffer().sniff(sample)
197 # Avoid a false-positive for json files
198 if cls.is_json(data_obj):
199 return False
200 return True
201 except Exception:
202 return False
204 @staticmethod
205 def is_parquet(data: IOBase) -> bool:
206 # Check first and last 4 bytes equal to PAR1.
207 # Refer: https://parquet.apache.org/docs/file-format/
208 parquet_sig = b"PAR1"
209 data.seek(0, 0)
210 start_meta = data.read(4)
211 data.seek(-4, 2)
212 end_meta = data.read()
213 data.seek(0)
214 if start_meta == parquet_sig and end_meta == parquet_sig:
215 return True
216 return False
219def format_column_names(df: pd.DataFrame):
220 df.columns = [column.strip(" \t") for column in df.columns]
221 if len(df.columns) != len(set(df.columns)) or any(len(column_name) == 0 for column_name in df.columns):
222 raise FileProcessingError("Each column should have a unique and non-empty name.")
225class FileReader(FormatDetector):
226 def _get_fnc(self):
227 format = self.get_format()
228 func = getattr(self, f"read_{format}", None)
229 if func is None: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 raise FileProcessingError(f"Unsupported format: {format}")
232 if format in astuple(MULTI_PAGE_FORMAT):
234 def format_multipage(*args, **kwargs):
235 for page_number, df in func(*args, **kwargs): 235 ↛ exitline 235 didn't return from function 'format_multipage' because the loop on line 235 didn't complete
236 format_column_names(df)
237 yield page_number, df
239 return format_multipage
241 def format_singlepage(*args, **kwargs) -> pd.DataFrame:
242 """Check that the columns have unique not-empty names"""
243 df = func(*args, **kwargs)
244 format_column_names(df)
245 return df
247 return format_singlepage
249 def get_pages(self, **kwargs) -> List[str]:
250 """
251 Get list of tables in file
252 """
253 format = self.get_format()
254 if format not in self.multipage_formats:
255 # only one table
256 return ["main"]
258 func = self._get_fnc()
259 self.file_obj.seek(0)
261 return [name for name, _ in func(self.file_obj, only_names=True, **kwargs)]
263 def get_contents(self, **kwargs) -> dict[str, pd.DataFrame]:
264 """
265 Get all info(pages with content) from file as dict: {tablename, content}
266 """
267 func = self._get_fnc()
268 self.file_obj.seek(0)
270 format = self.get_format()
271 if format not in self.multipage_formats: 271 ↛ 275line 271 didn't jump to line 275 because the condition on line 271 was always true
272 # only one table
273 return {"main": func(self.file_obj, name=self.name, **kwargs)}
275 return {name: df for name, df in func(self.file_obj, **kwargs)}
277 def get_page_content(self, page_name: str | None = None, **kwargs) -> pd.DataFrame:
278 """
279 Get content of a single table
280 """
281 func = self._get_fnc()
282 self.file_obj.seek(0)
284 format = self.get_format()
285 if format not in self.multipage_formats:
286 # only one table
287 return func(self.file_obj, name=self.name, **kwargs)
289 for _, df in func(self.file_obj, name=self.name, page_name=page_name, **kwargs): 289 ↛ exitline 289 didn't return from function 'get_page_content' because the loop on line 289 didn't complete
290 return df
292 @staticmethod
293 def _get_csv_dialect(buffer, delimiter: str | None = None) -> csv.Dialect | None:
294 sample = buffer.readline() # trying to get dialect from header
295 buffer.seek(0)
296 try:
297 if isinstance(sample, bytes): 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 sample = sample.decode()
300 if delimiter is not None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 accepted_csv_delimiters = [delimiter]
302 else:
303 accepted_csv_delimiters = [",", "\t", ";"]
304 try:
305 dialect = csv.Sniffer().sniff(sample, delimiters=accepted_csv_delimiters)
306 dialect.doublequote = True # assume that all csvs have " as string escape
307 except Exception:
308 dialect = csv.reader(sample).dialect
309 if dialect.delimiter not in accepted_csv_delimiters:
310 raise FileProcessingError(f"CSV delimeter '{dialect.delimiter}' is not supported")
312 except csv.Error:
313 dialect = None
314 return dialect
316 @classmethod
317 def read_csv(cls, file_obj: BytesIO, delimiter: str | None = None, **kwargs) -> pd.DataFrame:
318 file_obj = decode(file_obj)
319 dialect = cls._get_csv_dialect(file_obj, delimiter=delimiter)
320 return pd.read_csv(file_obj, sep=dialect.delimiter, index_col=False)
322 @staticmethod
323 def read_txt(file_obj: BytesIO, name: str | None = None, **kwargs) -> pd.DataFrame:
324 # the lib is heavy, so import it only when needed
326 file_obj = decode(file_obj)
328 text = file_obj.read()
330 text_splitter = TextSplitter(chunk_size=DEFAULT_CHUNK_SIZE, chunk_overlap=DEFAULT_CHUNK_OVERLAP)
332 docs = text_splitter.split_text(text)
333 return pd.DataFrame([{"content": doc, "metadata": {"source_file": name, "file_format": "txt"}} for doc in docs])
335 @staticmethod
336 def read_pdf(file_obj: BytesIO, name: str | None = None, **kwargs) -> pd.DataFrame:
337 # the libs are heavy, so import it only when needed
338 import fitz # pymupdf
340 with fitz.open(stream=file_obj.read()) as pdf: # open pdf
341 text = chr(12).join([page.get_text() for page in pdf])
343 text_splitter = TextSplitter(chunk_size=DEFAULT_CHUNK_SIZE, chunk_overlap=DEFAULT_CHUNK_OVERLAP)
345 split_text = text_splitter.split_text(text)
347 return pd.DataFrame(
348 {
349 "content": split_text,
350 "metadata": [{"file_format": "pdf", "source_file": name}] * len(split_text),
351 }
352 )
354 @staticmethod
355 def read_json(file_obj: BytesIO, **kwargs) -> pd.DataFrame:
356 file_obj = decode(file_obj)
357 file_obj.seek(0)
358 json_doc = json.loads(file_obj.read())
359 return pd.json_normalize(json_doc, max_level=0)
361 @staticmethod
362 def read_parquet(file_obj: BytesIO, **kwargs) -> pd.DataFrame:
363 return pd.read_parquet(file_obj)
365 @staticmethod
366 def read_xlsx(
367 file_obj: BytesIO,
368 page_name: str | None = None,
369 only_names: bool = False,
370 **kwargs,
371 ) -> Generator[tuple[str, pd.DataFrame | None], None, None]:
372 with pd.ExcelFile(file_obj) as xls:
373 if page_name is not None: 373 ↛ 375line 373 didn't jump to line 375 because the condition on line 373 was never true
374 # return specific page
375 yield page_name, pd.read_excel(xls, sheet_name=page_name)
377 for page_name in xls.sheet_names: 377 ↛ exitline 377 didn't jump to the function exit
378 if only_names: 378 ↛ 380line 378 didn't jump to line 380 because the condition on line 378 was never true
379 # extract only pages names
380 df = None
381 else:
382 df = pd.read_excel(xls, sheet_name=page_name)
383 yield page_name, df