Coverage for mindsdb / integrations / utilities / files / file_reader.py: 73%

242 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import csv 

2import json 

3import codecs 

4from io import BytesIO, StringIO, IOBase 

5from typing import List, Generator 

6from pathlib import Path 

7from dataclasses import dataclass, astuple 

8 

9import filetype 

10import pandas as pd 

11from charset_normalizer import from_bytes 

12 

13from mindsdb.interfaces.knowledge_base.preprocessing.text_splitter import TextSplitter 

14from mindsdb.utilities import log 

15 

16logger = log.getLogger(__name__) 

17 

18DEFAULT_CHUNK_SIZE = 500 

19DEFAULT_CHUNK_OVERLAP = 250 

20 

21 

22class FileProcessingError(Exception): ... 

23 

24 

25@dataclass(frozen=True, slots=True) 

26class _SINGLE_PAGE_FORMAT: 

27 CSV: str = "csv" 

28 JSON: str = "json" 

29 TXT: str = "txt" 

30 PDF: str = "pdf" 

31 PARQUET: str = "parquet" 

32 

33 

34SINGLE_PAGE_FORMAT = _SINGLE_PAGE_FORMAT() 

35 

36 

37@dataclass(frozen=True, slots=True) 

38class _MULTI_PAGE_FORMAT: 

39 XLSX: str = "xlsx" 

40 

41 

42MULTI_PAGE_FORMAT = _MULTI_PAGE_FORMAT() 

43 

44 

45def decode(file_obj: IOBase) -> StringIO: 

46 file_obj.seek(0) 

47 byte_str = file_obj.read() 

48 # Move it to StringIO 

49 try: 

50 # Handle Microsoft's BOM "special" UTF-8 encoding 

51 if byte_str.startswith(codecs.BOM_UTF8): 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 data_str = StringIO(byte_str.decode("utf-8-sig")) 

53 else: 

54 file_encoding_meta = from_bytes( 

55 byte_str[: 32 * 1024], 

56 steps=32, # Number of steps/block to extract from my_byte_str 

57 chunk_size=1024, # Set block size of each extraction) 

58 explain=False, 

59 ) 

60 best_meta = file_encoding_meta.best() 

61 errors = "strict" 

62 if best_meta is not None: 62 ↛ 73line 62 didn't jump to line 73 because the condition on line 62 was always true

63 encoding = file_encoding_meta.best().encoding 

64 

65 try: 

66 data_str = StringIO(byte_str.decode(encoding, errors)) 

67 except UnicodeDecodeError: 

68 encoding = "utf-8" 

69 errors = "replace" 

70 

71 data_str = StringIO(byte_str.decode(encoding, errors)) 

72 else: 

73 encoding = "utf-8" 

74 errors = "replace" 

75 

76 data_str = StringIO(byte_str.decode(encoding, errors)) 

77 except Exception as e: 

78 logger.exception("Error during file decode:") 

79 raise FileProcessingError("Could not load into string") from e 

80 

81 return data_str 

82 

83 

84class FormatDetector: 

85 supported_formats = astuple(SINGLE_PAGE_FORMAT) + astuple(MULTI_PAGE_FORMAT) 

86 multipage_formats = astuple(MULTI_PAGE_FORMAT) 

87 

88 def __init__( 

89 self, 

90 path: str | None = None, 

91 name: str | None = None, 

92 file: IOBase | None = None, 

93 ): 

94 """ 

95 File format detector 

96 One of these arguments has to be passed: `path` or `file` 

97 

98 :param path: path to the file 

99 :param name: name of the file 

100 :param file: file descriptor (via open(...), of BytesIO(...)) 

101 """ 

102 if path is not None: 

103 file = open(path, "rb") 

104 

105 elif file is not None: 105 ↛ 112line 105 didn't jump to line 112 because the condition on line 105 was always true

106 if name is None: 

107 if hasattr(file, "name"): 107 ↛ 110line 107 didn't jump to line 110 because the condition on line 107 was always true

108 path = file.name 

109 else: 

110 path = "file" 

111 else: 

112 raise FileProcessingError("Wrong arguments: path or file is required") 

113 

114 if name is None: 

115 name = Path(path).name 

116 

117 self.name = name 

118 self.file_obj = file 

119 self.format = None 

120 

121 self.parameters = {} 

122 

123 def get_format(self) -> str: 

124 if self.format is not None: 

125 return self.format 

126 

127 format = self.get_format_by_name() 

128 if format is not None: 128 ↛ 132line 128 didn't jump to line 132 because the condition on line 128 was always true

129 if format not in self.supported_formats: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 raise FileProcessingError(f"Not supported format: {format}") 

131 

132 if format is None and self.file_obj is not None: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 format = self.get_format_by_content() 

134 self.file_obj.seek(0) 

135 

136 if format is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 raise FileProcessingError(f"Unable to detect format: {self.name}") 

138 

139 self.format = format 

140 return format 

141 

142 def get_format_by_name(self): 

143 extension = Path(self.name).suffix.strip(".").lower() 

144 if extension == "tsv": 

145 extension = "csv" 

146 self.parameters["delimiter"] = "\t" 

147 

148 return extension or None 

149 

150 def get_format_by_content(self): 

151 if self.is_parquet(self.file_obj): 

152 return SINGLE_PAGE_FORMAT.PARQUET 

153 

154 file_type = filetype.guess(self.file_obj) 

155 if file_type is not None: 

156 if file_type.mime in { 

157 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", 

158 "application/vnd.ms-excel", 

159 }: 

160 return MULTI_PAGE_FORMAT.XLSX 

161 

162 if file_type.mime == "application/pdf": 

163 return SINGLE_PAGE_FORMAT.PDF 

164 

165 file_obj = decode(self.file_obj) 

166 

167 if self.is_json(file_obj): 

168 return SINGLE_PAGE_FORMAT.JSON 

169 

170 if self.is_csv(file_obj): 

171 return SINGLE_PAGE_FORMAT.CSV 

172 

173 @staticmethod 

174 def is_json(data_obj: StringIO) -> bool: 

175 # see if its JSON 

176 data_obj.seek(0) 

177 text = data_obj.read(100).strip() 

178 data_obj.seek(0) 

179 if len(text) > 0: 179 ↛ 187line 179 didn't jump to line 187 because the condition on line 179 was always true

180 # it looks like a json, then try to parse it 

181 if text.startswith("{") or text.startswith("["): 

182 try: 

183 json.loads(data_obj.read()) 

184 return True 

185 except Exception: 

186 return False 

187 return False 

188 

189 @classmethod 

190 def is_csv(cls, data_obj: StringIO) -> bool: 

191 data_obj.seek(0) 

192 sample = data_obj.readline() # trying to get dialect from header 

193 try: 

194 data_obj.seek(0) 

195 csv.Sniffer().sniff(sample) 

196 

197 # Avoid a false-positive for json files 

198 if cls.is_json(data_obj): 

199 return False 

200 return True 

201 except Exception: 

202 return False 

203 

204 @staticmethod 

205 def is_parquet(data: IOBase) -> bool: 

206 # Check first and last 4 bytes equal to PAR1. 

207 # Refer: https://parquet.apache.org/docs/file-format/ 

208 parquet_sig = b"PAR1" 

209 data.seek(0, 0) 

210 start_meta = data.read(4) 

211 data.seek(-4, 2) 

212 end_meta = data.read() 

213 data.seek(0) 

214 if start_meta == parquet_sig and end_meta == parquet_sig: 

215 return True 

216 return False 

217 

218 

219def format_column_names(df: pd.DataFrame): 

220 df.columns = [column.strip(" \t") for column in df.columns] 

221 if len(df.columns) != len(set(df.columns)) or any(len(column_name) == 0 for column_name in df.columns): 

222 raise FileProcessingError("Each column should have a unique and non-empty name.") 

223 

224 

225class FileReader(FormatDetector): 

226 def _get_fnc(self): 

227 format = self.get_format() 

228 func = getattr(self, f"read_{format}", None) 

229 if func is None: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 raise FileProcessingError(f"Unsupported format: {format}") 

231 

232 if format in astuple(MULTI_PAGE_FORMAT): 

233 

234 def format_multipage(*args, **kwargs): 

235 for page_number, df in func(*args, **kwargs): 235 ↛ exitline 235 didn't return from function 'format_multipage' because the loop on line 235 didn't complete

236 format_column_names(df) 

237 yield page_number, df 

238 

239 return format_multipage 

240 

241 def format_singlepage(*args, **kwargs) -> pd.DataFrame: 

242 """Check that the columns have unique not-empty names""" 

243 df = func(*args, **kwargs) 

244 format_column_names(df) 

245 return df 

246 

247 return format_singlepage 

248 

249 def get_pages(self, **kwargs) -> List[str]: 

250 """ 

251 Get list of tables in file 

252 """ 

253 format = self.get_format() 

254 if format not in self.multipage_formats: 

255 # only one table 

256 return ["main"] 

257 

258 func = self._get_fnc() 

259 self.file_obj.seek(0) 

260 

261 return [name for name, _ in func(self.file_obj, only_names=True, **kwargs)] 

262 

263 def get_contents(self, **kwargs) -> dict[str, pd.DataFrame]: 

264 """ 

265 Get all info(pages with content) from file as dict: {tablename, content} 

266 """ 

267 func = self._get_fnc() 

268 self.file_obj.seek(0) 

269 

270 format = self.get_format() 

271 if format not in self.multipage_formats: 271 ↛ 275line 271 didn't jump to line 275 because the condition on line 271 was always true

272 # only one table 

273 return {"main": func(self.file_obj, name=self.name, **kwargs)} 

274 

275 return {name: df for name, df in func(self.file_obj, **kwargs)} 

276 

277 def get_page_content(self, page_name: str | None = None, **kwargs) -> pd.DataFrame: 

278 """ 

279 Get content of a single table 

280 """ 

281 func = self._get_fnc() 

282 self.file_obj.seek(0) 

283 

284 format = self.get_format() 

285 if format not in self.multipage_formats: 

286 # only one table 

287 return func(self.file_obj, name=self.name, **kwargs) 

288 

289 for _, df in func(self.file_obj, name=self.name, page_name=page_name, **kwargs): 289 ↛ exitline 289 didn't return from function 'get_page_content' because the loop on line 289 didn't complete

290 return df 

291 

292 @staticmethod 

293 def _get_csv_dialect(buffer, delimiter: str | None = None) -> csv.Dialect | None: 

294 sample = buffer.readline() # trying to get dialect from header 

295 buffer.seek(0) 

296 try: 

297 if isinstance(sample, bytes): 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 sample = sample.decode() 

299 

300 if delimiter is not None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 accepted_csv_delimiters = [delimiter] 

302 else: 

303 accepted_csv_delimiters = [",", "\t", ";"] 

304 try: 

305 dialect = csv.Sniffer().sniff(sample, delimiters=accepted_csv_delimiters) 

306 dialect.doublequote = True # assume that all csvs have " as string escape 

307 except Exception: 

308 dialect = csv.reader(sample).dialect 

309 if dialect.delimiter not in accepted_csv_delimiters: 

310 raise FileProcessingError(f"CSV delimeter '{dialect.delimiter}' is not supported") 

311 

312 except csv.Error: 

313 dialect = None 

314 return dialect 

315 

316 @classmethod 

317 def read_csv(cls, file_obj: BytesIO, delimiter: str | None = None, **kwargs) -> pd.DataFrame: 

318 file_obj = decode(file_obj) 

319 dialect = cls._get_csv_dialect(file_obj, delimiter=delimiter) 

320 return pd.read_csv(file_obj, sep=dialect.delimiter, index_col=False) 

321 

322 @staticmethod 

323 def read_txt(file_obj: BytesIO, name: str | None = None, **kwargs) -> pd.DataFrame: 

324 # the lib is heavy, so import it only when needed 

325 

326 file_obj = decode(file_obj) 

327 

328 text = file_obj.read() 

329 

330 text_splitter = TextSplitter(chunk_size=DEFAULT_CHUNK_SIZE, chunk_overlap=DEFAULT_CHUNK_OVERLAP) 

331 

332 docs = text_splitter.split_text(text) 

333 return pd.DataFrame([{"content": doc, "metadata": {"source_file": name, "file_format": "txt"}} for doc in docs]) 

334 

335 @staticmethod 

336 def read_pdf(file_obj: BytesIO, name: str | None = None, **kwargs) -> pd.DataFrame: 

337 # the libs are heavy, so import it only when needed 

338 import fitz # pymupdf 

339 

340 with fitz.open(stream=file_obj.read()) as pdf: # open pdf 

341 text = chr(12).join([page.get_text() for page in pdf]) 

342 

343 text_splitter = TextSplitter(chunk_size=DEFAULT_CHUNK_SIZE, chunk_overlap=DEFAULT_CHUNK_OVERLAP) 

344 

345 split_text = text_splitter.split_text(text) 

346 

347 return pd.DataFrame( 

348 { 

349 "content": split_text, 

350 "metadata": [{"file_format": "pdf", "source_file": name}] * len(split_text), 

351 } 

352 ) 

353 

354 @staticmethod 

355 def read_json(file_obj: BytesIO, **kwargs) -> pd.DataFrame: 

356 file_obj = decode(file_obj) 

357 file_obj.seek(0) 

358 json_doc = json.loads(file_obj.read()) 

359 return pd.json_normalize(json_doc, max_level=0) 

360 

361 @staticmethod 

362 def read_parquet(file_obj: BytesIO, **kwargs) -> pd.DataFrame: 

363 return pd.read_parquet(file_obj) 

364 

365 @staticmethod 

366 def read_xlsx( 

367 file_obj: BytesIO, 

368 page_name: str | None = None, 

369 only_names: bool = False, 

370 **kwargs, 

371 ) -> Generator[tuple[str, pd.DataFrame | None], None, None]: 

372 with pd.ExcelFile(file_obj) as xls: 

373 if page_name is not None: 373 ↛ 375line 373 didn't jump to line 375 because the condition on line 373 was never true

374 # return specific page 

375 yield page_name, pd.read_excel(xls, sheet_name=page_name) 

376 

377 for page_name in xls.sheet_names: 377 ↛ exitline 377 didn't jump to the function exit

378 if only_names: 378 ↛ 380line 378 didn't jump to line 380 because the condition on line 378 was never true

379 # extract only pages names 

380 df = None 

381 else: 

382 df = pd.read_excel(xls, sheet_name=page_name) 

383 yield page_name, df