Coverage for mindsdb / interfaces / file / file_controller.py: 50%
132 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import os
3import shutil
4from pathlib import Path
6import pandas as pd
8from mindsdb.interfaces.storage import db
9from mindsdb.interfaces.storage.fs import FsStore
10from mindsdb.utilities import log
11from mindsdb.utilities.config import Config
12from mindsdb.utilities.context import context as ctx
13from sqlalchemy.orm.attributes import flag_modified
15from mindsdb.integrations.utilities.files.file_reader import FileReader
18logger = log.getLogger(__name__)
21class FileController:
22 def __init__(self):
23 self.config = Config()
24 self.fs_store = FsStore()
25 self.dir = os.path.join(self.config.paths["content"], "file")
27 def get_files_names(self, lower: bool = False):
28 """return list of files names
30 Args:
31 lower (bool): return names in lowercase if True
33 Returns:
34 list[str]: list of files names
35 """
36 names = [record[0] for record in db.session.query(db.File.name).filter_by(company_id=ctx.company_id)]
37 if lower: 37 ↛ 39line 37 didn't jump to line 39 because the condition on line 37 was always true
38 names = [name.lower() for name in names]
39 return names
41 def get_file_meta(self, name):
42 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first()
43 if file_record is None:
44 return None
45 columns = file_record.columns
46 if isinstance(columns, str):
47 columns = json.loads(columns)
48 return {
49 "name": file_record.name,
50 "columns": columns,
51 "row_count": file_record.row_count,
52 }
54 def get_files(self):
55 """Get list of files
57 Returns:
58 list[dict]: files metadata
59 """
60 file_records = db.session.query(db.File).filter_by(company_id=ctx.company_id).all()
61 files_metadata = [
62 {
63 "name": record.name,
64 "row_count": record.row_count,
65 "columns": record.columns,
66 }
67 for record in file_records
68 ]
69 return files_metadata
71 def save_file(self, name, file_path, file_name=None):
72 """Save the file to our store
74 Args:
75 name (str): with that name file will be available in sql api
76 file_name (str): file name
77 file_path (str): path to the file
79 Returns:
80 int: id of 'file' record in db
81 """
82 files_metadata = self.get_files()
83 if name in [x["name"] for x in files_metadata]: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 raise Exception(f"File already exists: {name}")
86 if file_name is None: 86 ↛ 89line 86 didn't jump to line 89 because the condition on line 86 was always true
87 file_name = Path(file_path).name
89 file_dir = None
90 try:
91 pages_files, pages_index = self.get_file_pages(file_path)
93 metadata = {"is_feather": True, "pages": pages_index}
94 df = pages_files[0]
95 file_record = db.File(
96 name=name,
97 company_id=ctx.company_id,
98 source_file_path=file_name,
99 file_path="",
100 row_count=len(df),
101 columns=list(df.columns),
102 metadata_=metadata,
103 )
104 db.session.add(file_record)
105 db.session.flush()
107 store_file_path = f"file_{ctx.company_id}_{file_record.id}"
108 file_record.file_path = store_file_path
110 file_dir = Path(self.dir).joinpath(store_file_path)
111 file_dir.mkdir(parents=True, exist_ok=True)
113 self.store_pages_as_feather(file_dir, pages_files)
114 # store original file
115 shutil.move(file_path, str(file_dir.joinpath(file_name)))
117 self.fs_store.put(store_file_path, base_dir=self.dir)
118 db.session.commit()
120 except Exception:
121 logger.exception("An error occurred while saving the file:")
122 if file_dir is not None:
123 shutil.rmtree(file_dir)
124 raise
126 return file_record.id
128 def get_file_pages(self, source_path: str):
129 """
130 Reads file and extract pages from it
131 Returned structures:
132 - page_files: dict with content, {page_num: dataframe}
133 - pages_index: dict, link between page name and num: {page_name: page_num}
134 """
135 file_reader = FileReader(path=source_path)
136 tables = file_reader.get_contents()
138 pages_files = {}
139 pages_index = {}
140 if len(tables) == 1: 140 ↛ 145line 140 didn't jump to line 145 because the condition on line 140 was always true
141 df = list(tables.values())[0]
142 pages_files[0] = df
143 else:
144 # file has several pages, create a new one with info
145 df = pd.DataFrame(tables.keys(), columns=["Tables"])
146 pages_files[0] = df
147 for i, page_name in enumerate(tables.keys(), 1):
148 pages_files[i] = tables[page_name]
149 pages_index[page_name] = i
150 return pages_files, pages_index
152 def store_pages_as_feather(self, dest_dir: Path, pages_files: dict):
153 """
154 Stores pages in file storage dir in feather format
155 """
157 for num, df in pages_files.items():
158 dest = dest_dir.joinpath(f"{num}.feather")
159 df.to_feather(str(dest))
161 def delete_file(self, name):
162 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first()
163 if file_record is None:
164 raise FileNotFoundError(f"File '{name}' does not exists")
165 file_id = file_record.id
166 db.session.delete(file_record)
167 db.session.commit()
168 self.fs_store.delete(f"file_{ctx.company_id}_{file_id}")
169 return True
171 def get_file_path(self, name):
172 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first()
173 if file_record is None:
174 raise FileNotFoundError(f"File '{name}' does not exists")
175 file_dir = f"file_{ctx.company_id}_{file_record.id}"
176 self.fs_store.get(file_dir, base_dir=self.dir)
177 return str(Path(self.dir).joinpath(file_dir).joinpath(Path(file_record.source_file_path).name))
179 def get_file_data(self, name: str, page_name: str = None) -> pd.DataFrame:
180 """
181 Returns file content as dataframe
183 :param name: name of file
184 :param page_name: page name, optional
185 :return: Page or file content
186 """
187 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first()
188 if file_record is None:
189 raise FileNotFoundError(f"File '{name}' does not exists")
191 file_dir = f"file_{ctx.company_id}_{file_record.id}"
192 self.fs_store.get(file_dir, base_dir=self.dir)
194 metadata = file_record.metadata_ or {}
195 if metadata.get("is_feather") is not True:
196 # migrate file
198 file_path = Path(self.dir).joinpath(file_dir).joinpath(Path(file_record.source_file_path).name)
200 pages_files, pages_index = self.get_file_pages(str(file_path))
202 self.store_pages_as_feather(file_path.parent, pages_files)
203 metadata["is_feather"] = True
204 metadata["pages"] = pages_index
206 file_record.metadata_ = metadata
207 flag_modified(file_record, "metadata_")
208 db.session.commit()
210 if page_name is None:
211 num = 0
212 else:
213 num = metadata.get("pages", {}).get(page_name)
214 if num is None:
215 raise KeyError(f"Page not found: {page_name}")
217 path = Path(self.dir).joinpath(file_dir).joinpath(f"{num}.feather")
218 return pd.read_feather(path)
220 def set_file_data(self, name: str, df: pd.DataFrame, page_name: str = None):
221 """
222 Save file content
223 :param name: name of file
224 :param df: content to store
225 :param page_name: name of page, optional
226 """
228 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first()
229 if file_record is None:
230 raise FileNotFoundError(f"File '{name}' does not exists")
232 file_dir = f"file_{ctx.company_id}_{file_record.id}"
233 self.fs_store.get(file_dir, base_dir=self.dir)
235 num = 0
236 if page_name is not None and file_record.metadata_ is not None:
237 num = file_record.metadata_.get("pages", {}).get(page_name, 0)
239 path = Path(self.dir).joinpath(file_dir).joinpath(f"{num}.feather")
240 df.to_feather(path)
241 self.fs_store.put(file_dir, base_dir=self.dir)