Coverage for mindsdb / integrations / handlers / box_handler / box_handler.py: 0%

191 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import io 

2import pandas as pd 

3from box_sdk_gen import BoxClient, BoxDeveloperTokenAuth, CreateFolderParent, UploadFileAttributes, UploadFileAttributesParentField 

4from box_sdk_gen.internal import utils 

5from typing import Dict, Optional, Text 

6 

7from mindsdb_sql_parser.ast.base import ASTNode 

8from mindsdb_sql_parser.ast import Select, Identifier, Insert 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE, 

15) 

16from mindsdb.integrations.libs.api_handler import APIHandler, APIResource 

17 

18 

19class ListFilesTable(APIResource): 

20 

21 def list(self, conditions=None, limit=None, sort=None, targets=None, **kwargs): 

22 files = self.handler._list_files() 

23 data = [] 

24 for file in files: 

25 item = { 

26 "path": file["path"], 

27 "name": file["name"], 

28 "extension": file["extension"], 

29 } 

30 data.append(item) 

31 df = pd.DataFrame(data) 

32 return df 

33 

34 def get_columns(self): 

35 return ["path", "name", "extension"] 

36 

37 

38class FileTable(APIResource): 

39 

40 def _get_file_df(self): 

41 try: 

42 df = self.handler._read_file(self.table_name) 

43 if df is None: 

44 raise Exception(f"No such file found for the path: {self.box_path}") 

45 

46 return df 

47 except Exception as e: 

48 self.handler.logger.error(e) 

49 

50 def list(self, conditions=None, limit=None, sort=None, targets=None, **kwargs): 

51 return self._get_file_df() 

52 

53 def get_columns(self): 

54 df = self.handler._read_file(self.table_name) 

55 return df.columns.tolist() 

56 

57 def insert(self, query: Insert) -> None: 

58 columns = [col.name for col in query.columns] 

59 data = [dict(zip(columns, row)) for row in query.values] 

60 df_new = pd.DataFrame(data) 

61 df_existing = self._get_file_df() 

62 df_combined = pd.concat([df_existing, df_new], ignore_index=True) 

63 self.handler._write_file(self.table_name, df_combined) 

64 

65 

66class BoxHandler(APIHandler): 

67 

68 name = "box" 

69 supported_file_formats = ["csv", "tsv", "json", "parquet"] 

70 

71 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs): 

72 super().__init__(name) 

73 self.connection_data = connection_data 

74 self.kwargs = kwargs 

75 self.logger = log.getLogger(__name__) 

76 self.client = None 

77 self.is_connected = False 

78 self._files_table = ListFilesTable(self) 

79 self._register_table("files", self._files_table) 

80 

81 def connect(self): 

82 try: 

83 if self.is_connected: 

84 return 

85 if "token" not in self.connection_data: 

86 raise ValueError("Developer token must be provided.") 

87 auth = BoxDeveloperTokenAuth(token=self.connection_data["token"]) 

88 self.client = BoxClient(auth=auth) 

89 self.client.folders.get_folder_items("0", limit=1) 

90 self.is_connected = True 

91 self.logger.info("Connected to Box") 

92 except Exception as e: 

93 self.logger.error(f"Error connecting to Box: {e}") 

94 

95 def check_connection(self) -> StatusResponse: 

96 response = StatusResponse(False) 

97 try: 

98 self.connect() 

99 response.success = True 

100 except Exception as e: 

101 self.logger.error(f"Error with Box Handler while establish connection: {e}") 

102 response.error_message = str(e) 

103 return response 

104 

105 def disconnect(self): 

106 if not self.is_connected: 

107 return 

108 self.client = None 

109 self.is_connected = False 

110 self.logger.info("Disconnected from Box") 

111 

112 def _read_as_content(self, file_path) -> utils.Buffer: 

113 """ 

114 Read files as content 

115 """ 

116 try: 

117 id = "0" # Root folder id. The value is always "0". 

118 

119 items = file_path.strip('/').split('/') 

120 

121 for item in items: 

122 item_id = None 

123 for entry in self.client.folders.get_folder_items(id).entries: 

124 if entry.name == item: 

125 item_id = entry.id 

126 break 

127 if item_id: 

128 id = item_id 

129 else: 

130 raise ValueError(f"{item} not found. Specify the correct path.") 

131 

132 downloaded_file_content = self.client.downloads.download_file(id) 

133 buffer = utils.read_byte_stream(downloaded_file_content) 

134 return buffer 

135 except Exception as e: 

136 self.logger.error(f"Error when downloading a file from Box: {e}") 

137 

138 def query(self, query: ASTNode) -> Response: 

139 

140 if isinstance(query, Select): 

141 table_name = query.from_table.parts[-1] 

142 if table_name == "files": 

143 table = self._files_table 

144 df = table.select(query) 

145 has_content = False 

146 for target in query.targets: 

147 if ( 

148 isinstance(target, Identifier) 

149 and target.parts[-1].lower() == "content" 

150 ): 

151 has_content = True 

152 break 

153 if has_content: 

154 df["content"] = df["path"].apply(self._read_as_content) 

155 else: 

156 table = FileTable(self, table_name=table_name) 

157 df = table.select(query) 

158 

159 return Response(RESPONSE_TYPE.TABLE, data_frame=df) 

160 elif isinstance(query, Insert): 

161 table_name = query.table.parts[-1] 

162 table = FileTable(self, table_name=table_name) 

163 table.insert(query) 

164 return Response(RESPONSE_TYPE.OK) 

165 else: 

166 raise NotImplementedError( 

167 "Only SELECT and INSERT operations are supported." 

168 ) 

169 

170 def get_tables(self) -> Response: 

171 table_names = list(self._tables.keys()) 

172 df = pd.DataFrame(table_names, columns=["table_name"]) 

173 return Response(RESPONSE_TYPE.TABLE, data_frame=df) 

174 

175 def get_columns(self, table_name: str) -> Response: 

176 table = self._get_table(Identifier(table_name)) 

177 columns = table.get_columns() 

178 df = pd.DataFrame(columns, columns=["column_name"]) 

179 return Response(RESPONSE_TYPE.TABLE, data_frame=df) 

180 

181 def list_files_in_folder(self, folder_id, files=[]): 

182 items = self.client.folders.get_folder_items(folder_id).entries 

183 for item in items: 

184 if item.type == 'folder': 

185 self.list_files_in_folder(item.id, files) 

186 elif item.type == 'file': 

187 extension = item.name.split(".")[-1].lower() 

188 if extension in self.supported_file_formats: 

189 file = self.client.files.get_file_by_id(item.id) 

190 full_path = [path.name for path in file.path_collection.entries][1:] 

191 full_path.append(item.name) 

192 files.append( 

193 { 

194 "path": "/".join(full_path), 

195 "name": item.name, 

196 "extension": extension, 

197 } 

198 ) 

199 

200 return files 

201 

202 def _list_files(self, path=""): 

203 return self.list_files_in_folder("0") 

204 

205 def _read_file(self, path) -> pd.DataFrame: 

206 try: 

207 buffer = self._read_as_content(path) 

208 extension = path.split(".")[-1].lower() 

209 if extension == "csv": 

210 df = pd.read_csv(io.BytesIO(buffer)) 

211 elif extension == "tsv": 

212 df = pd.read_csv(io.BytesIO(buffer), sep="\t") 

213 elif extension == "json": 

214 df = pd.read_json(io.BytesIO(buffer)) 

215 elif extension == "parquet": 

216 df = pd.read_parquet(io.BytesIO(buffer)) 

217 else: 

218 raise ValueError(f"Unsupported file format: {extension}") 

219 return df 

220 except Exception as e: 

221 self.logger.error(f"Error with Box Handler while reading file: {e}") 

222 

223 def upload_file(self, path, buffer): 

224 parent_folder_id = "0" 

225 

226 directories = path.strip('/').split('/') 

227 

228 for direc in directories[:-1]: 

229 sub_folder_id = None 

230 for item in self.client.folders.get_folder_items(parent_folder_id).entries: 

231 if item.type == "folder" and item.name == direc: 

232 sub_folder_id = item.id 

233 break 

234 if sub_folder_id: 

235 parent_folder_id = sub_folder_id 

236 else: 

237 new_sub_folder = self.client.folders.create_folder(direc, CreateFolderParent(id=parent_folder_id)) 

238 parent_folder_id = new_sub_folder.id 

239 self.logger.debug(f"folder {direc} not available in path. Creating {direc}...") 

240 

241 self.client.uploads.upload_file(UploadFileAttributes(name=directories[-1], parent=UploadFileAttributesParentField(id=parent_folder_id)), buffer) 

242 

243 def _write_file(self, path, df: pd.DataFrame): 

244 extension = path.split(".")[-1].lower() 

245 buffer = io.BytesIO() 

246 if extension == "csv": 

247 df.to_csv(buffer, index=False) 

248 elif extension == "tsv": 

249 df.to_csv(buffer, index=False, sep="\t") 

250 elif extension == "json": 

251 df.to_json(buffer, orient="records") 

252 elif extension == "parquet": 

253 df.to_parquet(buffer, index=False) 

254 else: 

255 raise ValueError(f"Unsupported file format: {extension}") 

256 buffer.seek(0) 

257 self.upload_file(path, buffer)