Coverage for mindsdb / integrations / handlers / dropbox_handler / dropbox_handler.py: 0%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import io 

2import pandas as pd 

3import dropbox 

4 

5from dropbox.exceptions import AuthError, ApiError, BadInputError 

6from typing import Dict, Optional, Text 

7 

8from mindsdb_sql_parser.ast.base import ASTNode 

9from mindsdb_sql_parser.ast import Select, Identifier, Insert 

10 

11from mindsdb.utilities import log 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE, 

16) 

17 

18 

19from mindsdb.integrations.libs.api_handler import APIHandler, APIResource 

20 

21 

22class ListFilesTable(APIResource): 

23 

24 def list(self, conditions=None, limit=None, sort=None, targets=None, **kwargs): 

25 files = self.handler._list_files() 

26 data = [] 

27 for file in files: 

28 item = { 

29 "path": file["path"], 

30 "name": file["name"], 

31 "extension": file["extension"], 

32 } 

33 data.append(item) 

34 df = pd.DataFrame(data) 

35 return df 

36 

37 def get_columns(self): 

38 return ["path", "name", "extension"] 

39 

40 

41class FileTable(APIResource): 

42 

43 def _get_file_df(self): 

44 try: 

45 df = self.handler._read_file(self.table_name) 

46 if df is None: 

47 raise Exception(f"No such file found for the path: {self.dropbox_path}") 

48 

49 return df 

50 except Exception as e: 

51 self.handler.logger.error(e) 

52 

53 def list(self, conditions=None, limit=None, sort=None, targets=None, **kwargs): 

54 return self._get_file_df() 

55 

56 def get_columns(self): 

57 df = self.handler._read_file(self.table_name) 

58 return df.columns.tolist() 

59 

60 def insert(self, query: Insert) -> None: 

61 columns = [col.name for col in query.columns] 

62 data = [dict(zip(columns, row)) for row in query.values] 

63 df_new = pd.DataFrame(data) 

64 df_existing = self._get_file_df() 

65 df_combined = pd.concat([df_existing, df_new], ignore_index=True) 

66 self.handler._write_file(self.table_name, df_combined) 

67 

68 

69class DropboxHandler(APIHandler): 

70 

71 name = "dropbox" 

72 supported_file_formats = ["csv", "tsv", "json", "parquet"] 

73 

74 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs): 

75 super().__init__(name) 

76 self.connection_data = connection_data 

77 self.kwargs = kwargs 

78 self.logger = log.getLogger(__name__) 

79 self.dbx = None 

80 self.is_connected = False 

81 self._files_table = ListFilesTable(self) 

82 self._register_table("files", self._files_table) 

83 

84 def connect(self): 

85 try: 

86 if self.is_connected: 

87 return 

88 if "access_token" not in self.connection_data: 

89 raise ValueError("Access token must be provided.") 

90 self.dbx = dropbox.Dropbox(self.connection_data["access_token"]) 

91 self.is_connected = True 

92 self.logger.info( 

93 f"Connected to Dropbox as {self.dbx.users_get_current_account().email}" 

94 ) 

95 except ValueError as e: 

96 self.logger.error(f"Error connecting to Dropbox: {e}") 

97 except AuthError as e: 

98 self.logger.error(f"Authentication error with Dropbox: {e}") 

99 except BadInputError as e: 

100 self.logger.error(f"Bad input error with Dropbox: {e}") 

101 except Exception as e: 

102 self.logger.error(f"Error with Dropbox: {e}") 

103 

104 def check_connection(self) -> StatusResponse: 

105 response = StatusResponse(False) 

106 try: 

107 self.connect() 

108 response.success = True 

109 except (ApiError, ValueError) as e: 

110 self.logger.error(f"Error connecting to Dropbox with Dropbox: {e}") 

111 response.error_message = str(e) 

112 except AuthError as e: 

113 self.logger.error(f"Authentication error with Dropbox: {e}") 

114 response.error_message = str(e) 

115 except Exception as e: 

116 self.logger.error(f"Error with Dropbox Handler: {e}") 

117 response.error_message = str(e) 

118 return response 

119 

120 def disconnect(self): 

121 if not self.is_connected: 

122 return 

123 self.dbx = None 

124 self.is_connected = False 

125 self.logger.info("Disconnected from Dropbox") 

126 

127 def _read_as_content(self, file_path) -> None: 

128 """ 

129 Read files as content 

130 """ 

131 try: 

132 _, res = self.dbx.files_download(file_path) 

133 content = res.content 

134 return content 

135 except ApiError as e: 

136 self.logger.error(f"Error when downloading a file from Dropbox: {e}") 

137 

138 def query(self, query: ASTNode) -> Response: 

139 

140 if isinstance(query, Select): 

141 table_name = query.from_table.parts[-1] 

142 if table_name == "files": 

143 table = self._files_table 

144 df = table.select(query) 

145 

146 # add content 

147 has_content = False 

148 for target in query.targets: 

149 if ( 

150 isinstance(target, Identifier) 

151 and target.parts[-1].lower() == "content" 

152 ): 

153 has_content = True 

154 break 

155 if has_content: 

156 df["content"] = df["path"].apply(self._read_as_content) 

157 else: 

158 table = FileTable(self, table_name=table_name) 

159 df = table.select(query) 

160 

161 return Response(RESPONSE_TYPE.TABLE, data_frame=df) 

162 elif isinstance(query, Insert): 

163 table_name = query.table.parts[-1] 

164 table = FileTable(self, table_name=table_name) 

165 table.insert(query) 

166 return Response(RESPONSE_TYPE.OK) 

167 else: 

168 raise NotImplementedError( 

169 "Only SELECT and INSERT operations are supported." 

170 ) 

171 

172 def get_tables(self) -> Response: 

173 table_names = list(self._tables.keys()) 

174 df = pd.DataFrame(table_names, columns=["table_name"]) 

175 return Response(RESPONSE_TYPE.TABLE, data_frame=df) 

176 

177 def get_columns(self, table_name: str) -> Response: 

178 table = self._get_table(Identifier(table_name)) 

179 columns = table.get_columns() 

180 df = pd.DataFrame(columns, columns=["column_name"]) 

181 return Response(RESPONSE_TYPE.TABLE, data_frame=df) 

182 

183 def _list_files(self, path=""): 

184 files = [] 

185 result = self.dbx.files_list_folder(path, recursive=True) 

186 files.extend(self._process_entries(result.entries)) 

187 while result.has_more: 

188 result = self.dbx.files_list_folder_continue(result.cursor) 

189 files.extend(self._process_entries(result.entries)) 

190 return files 

191 

192 def _process_entries(self, entries): 

193 files = [] 

194 for entry in entries: 

195 if isinstance(entry, dropbox.files.FileMetadata): 

196 extension = entry.name.split(".")[-1].lower() 

197 if extension in self.supported_file_formats: 

198 files.append( 

199 { 

200 "path": entry.path_lower, 

201 "name": entry.name, 

202 "extension": extension, 

203 } 

204 ) 

205 return files 

206 

207 def _read_file(self, path) -> pd.DataFrame: 

208 try: 

209 _, res = self.dbx.files_download(path) 

210 content = res.content 

211 extension = path.split(".")[-1].lower() 

212 if extension == "csv": 

213 df = pd.read_csv(io.BytesIO(content)) 

214 elif extension == "tsv": 

215 df = pd.read_csv(io.BytesIO(content), sep="\t") 

216 elif extension == "json": 

217 df = pd.read_json(io.BytesIO(content)) 

218 elif extension == "parquet": 

219 df = pd.read_parquet(io.BytesIO(content)) 

220 else: 

221 raise ValueError(f"Unsupported file format: {extension}") 

222 return df 

223 except ValueError as e: 

224 self.logger.error(f"Error with file extension: {e}") 

225 except ApiError as e: 

226 self.logger.error(f"Error when downloading a file from Dropbox: {e}") 

227 except Exception as e: 

228 self.logger.error(f"Error with Dropbox Handler: {e}") 

229 

230 def _write_file(self, path, df: pd.DataFrame): 

231 try: 

232 extension = path.split(".")[-1].lower() 

233 buffer = io.BytesIO() 

234 if extension == "csv": 

235 df.to_csv(buffer, index=False) 

236 elif extension == "tsv": 

237 df.to_csv(buffer, index=False, sep="\t") 

238 elif extension == "json": 

239 df.to_json(buffer, orient="records") 

240 elif extension == "parquet": 

241 df.to_parquet(buffer, index=False) 

242 else: 

243 raise ValueError(f"Unsupported file format: {extension}") 

244 buffer.seek(0) 

245 self.dbx.files_upload( 

246 buffer.read(), path, mode=dropbox.files.WriteMode.overwrite 

247 ) 

248 except ValueError as e: 

249 self.logger.error(f"Error with file extension: {e}") 

250 except ApiError as e: 

251 self.logger.error(f"Error when writing a file to Dropbox: {e}") 

252 except Exception as e: 

253 self.logger.error(f"Error with Dropbox Handler: {e}")