Coverage for mindsdb / interfaces / file / file_controller.py: 50%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2import os 

3import shutil 

4from pathlib import Path 

5 

6import pandas as pd 

7 

8from mindsdb.interfaces.storage import db 

9from mindsdb.interfaces.storage.fs import FsStore 

10from mindsdb.utilities import log 

11from mindsdb.utilities.config import Config 

12from mindsdb.utilities.context import context as ctx 

13from sqlalchemy.orm.attributes import flag_modified 

14 

15from mindsdb.integrations.utilities.files.file_reader import FileReader 

16 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class FileController: 

22 def __init__(self): 

23 self.config = Config() 

24 self.fs_store = FsStore() 

25 self.dir = os.path.join(self.config.paths["content"], "file") 

26 

27 def get_files_names(self, lower: bool = False): 

28 """return list of files names 

29 

30 Args: 

31 lower (bool): return names in lowercase if True 

32 

33 Returns: 

34 list[str]: list of files names 

35 """ 

36 names = [record[0] for record in db.session.query(db.File.name).filter_by(company_id=ctx.company_id)] 

37 if lower: 37 ↛ 39line 37 didn't jump to line 39 because the condition on line 37 was always true

38 names = [name.lower() for name in names] 

39 return names 

40 

41 def get_file_meta(self, name): 

42 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first() 

43 if file_record is None: 

44 return None 

45 columns = file_record.columns 

46 if isinstance(columns, str): 

47 columns = json.loads(columns) 

48 return { 

49 "name": file_record.name, 

50 "columns": columns, 

51 "row_count": file_record.row_count, 

52 } 

53 

54 def get_files(self): 

55 """Get list of files 

56 

57 Returns: 

58 list[dict]: files metadata 

59 """ 

60 file_records = db.session.query(db.File).filter_by(company_id=ctx.company_id).all() 

61 files_metadata = [ 

62 { 

63 "name": record.name, 

64 "row_count": record.row_count, 

65 "columns": record.columns, 

66 } 

67 for record in file_records 

68 ] 

69 return files_metadata 

70 

71 def save_file(self, name, file_path, file_name=None): 

72 """Save the file to our store 

73 

74 Args: 

75 name (str): with that name file will be available in sql api 

76 file_name (str): file name 

77 file_path (str): path to the file 

78 

79 Returns: 

80 int: id of 'file' record in db 

81 """ 

82 files_metadata = self.get_files() 

83 if name in [x["name"] for x in files_metadata]: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 raise Exception(f"File already exists: {name}") 

85 

86 if file_name is None: 86 ↛ 89line 86 didn't jump to line 89 because the condition on line 86 was always true

87 file_name = Path(file_path).name 

88 

89 file_dir = None 

90 try: 

91 pages_files, pages_index = self.get_file_pages(file_path) 

92 

93 metadata = {"is_feather": True, "pages": pages_index} 

94 df = pages_files[0] 

95 file_record = db.File( 

96 name=name, 

97 company_id=ctx.company_id, 

98 source_file_path=file_name, 

99 file_path="", 

100 row_count=len(df), 

101 columns=list(df.columns), 

102 metadata_=metadata, 

103 ) 

104 db.session.add(file_record) 

105 db.session.flush() 

106 

107 store_file_path = f"file_{ctx.company_id}_{file_record.id}" 

108 file_record.file_path = store_file_path 

109 

110 file_dir = Path(self.dir).joinpath(store_file_path) 

111 file_dir.mkdir(parents=True, exist_ok=True) 

112 

113 self.store_pages_as_feather(file_dir, pages_files) 

114 # store original file 

115 shutil.move(file_path, str(file_dir.joinpath(file_name))) 

116 

117 self.fs_store.put(store_file_path, base_dir=self.dir) 

118 db.session.commit() 

119 

120 except Exception: 

121 logger.exception("An error occurred while saving the file:") 

122 if file_dir is not None: 

123 shutil.rmtree(file_dir) 

124 raise 

125 

126 return file_record.id 

127 

128 def get_file_pages(self, source_path: str): 

129 """ 

130 Reads file and extract pages from it 

131 Returned structures: 

132 - page_files: dict with content, {page_num: dataframe} 

133 - pages_index: dict, link between page name and num: {page_name: page_num} 

134 """ 

135 file_reader = FileReader(path=source_path) 

136 tables = file_reader.get_contents() 

137 

138 pages_files = {} 

139 pages_index = {} 

140 if len(tables) == 1: 140 ↛ 145line 140 didn't jump to line 145 because the condition on line 140 was always true

141 df = list(tables.values())[0] 

142 pages_files[0] = df 

143 else: 

144 # file has several pages, create a new one with info 

145 df = pd.DataFrame(tables.keys(), columns=["Tables"]) 

146 pages_files[0] = df 

147 for i, page_name in enumerate(tables.keys(), 1): 

148 pages_files[i] = tables[page_name] 

149 pages_index[page_name] = i 

150 return pages_files, pages_index 

151 

152 def store_pages_as_feather(self, dest_dir: Path, pages_files: dict): 

153 """ 

154 Stores pages in file storage dir in feather format 

155 """ 

156 

157 for num, df in pages_files.items(): 

158 dest = dest_dir.joinpath(f"{num}.feather") 

159 df.to_feather(str(dest)) 

160 

161 def delete_file(self, name): 

162 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first() 

163 if file_record is None: 

164 raise FileNotFoundError(f"File '{name}' does not exists") 

165 file_id = file_record.id 

166 db.session.delete(file_record) 

167 db.session.commit() 

168 self.fs_store.delete(f"file_{ctx.company_id}_{file_id}") 

169 return True 

170 

171 def get_file_path(self, name): 

172 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first() 

173 if file_record is None: 

174 raise FileNotFoundError(f"File '{name}' does not exists") 

175 file_dir = f"file_{ctx.company_id}_{file_record.id}" 

176 self.fs_store.get(file_dir, base_dir=self.dir) 

177 return str(Path(self.dir).joinpath(file_dir).joinpath(Path(file_record.source_file_path).name)) 

178 

179 def get_file_data(self, name: str, page_name: str = None) -> pd.DataFrame: 

180 """ 

181 Returns file content as dataframe 

182 

183 :param name: name of file 

184 :param page_name: page name, optional 

185 :return: Page or file content 

186 """ 

187 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first() 

188 if file_record is None: 

189 raise FileNotFoundError(f"File '{name}' does not exists") 

190 

191 file_dir = f"file_{ctx.company_id}_{file_record.id}" 

192 self.fs_store.get(file_dir, base_dir=self.dir) 

193 

194 metadata = file_record.metadata_ or {} 

195 if metadata.get("is_feather") is not True: 

196 # migrate file 

197 

198 file_path = Path(self.dir).joinpath(file_dir).joinpath(Path(file_record.source_file_path).name) 

199 

200 pages_files, pages_index = self.get_file_pages(str(file_path)) 

201 

202 self.store_pages_as_feather(file_path.parent, pages_files) 

203 metadata["is_feather"] = True 

204 metadata["pages"] = pages_index 

205 

206 file_record.metadata_ = metadata 

207 flag_modified(file_record, "metadata_") 

208 db.session.commit() 

209 

210 if page_name is None: 

211 num = 0 

212 else: 

213 num = metadata.get("pages", {}).get(page_name) 

214 if num is None: 

215 raise KeyError(f"Page not found: {page_name}") 

216 

217 path = Path(self.dir).joinpath(file_dir).joinpath(f"{num}.feather") 

218 return pd.read_feather(path) 

219 

220 def set_file_data(self, name: str, df: pd.DataFrame, page_name: str = None): 

221 """ 

222 Save file content 

223 :param name: name of file 

224 :param df: content to store 

225 :param page_name: name of page, optional 

226 """ 

227 

228 file_record = db.session.query(db.File).filter_by(company_id=ctx.company_id, name=name).first() 

229 if file_record is None: 

230 raise FileNotFoundError(f"File '{name}' does not exists") 

231 

232 file_dir = f"file_{ctx.company_id}_{file_record.id}" 

233 self.fs_store.get(file_dir, base_dir=self.dir) 

234 

235 num = 0 

236 if page_name is not None and file_record.metadata_ is not None: 

237 num = file_record.metadata_.get("pages", {}).get(page_name, 0) 

238 

239 path = Path(self.dir).joinpath(file_dir).joinpath(f"{num}.feather") 

240 df.to_feather(path) 

241 self.fs_store.put(file_dir, base_dir=self.dir)