Coverage for mindsdb / integrations / handlers / dropbox_handler / dropbox_handler.py: 0%
188 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import io
2import pandas as pd
3import dropbox
5from dropbox.exceptions import AuthError, ApiError, BadInputError
6from typing import Dict, Optional, Text
8from mindsdb_sql_parser.ast.base import ASTNode
9from mindsdb_sql_parser.ast import Select, Identifier, Insert
11from mindsdb.utilities import log
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE,
16)
19from mindsdb.integrations.libs.api_handler import APIHandler, APIResource
22class ListFilesTable(APIResource):
24 def list(self, conditions=None, limit=None, sort=None, targets=None, **kwargs):
25 files = self.handler._list_files()
26 data = []
27 for file in files:
28 item = {
29 "path": file["path"],
30 "name": file["name"],
31 "extension": file["extension"],
32 }
33 data.append(item)
34 df = pd.DataFrame(data)
35 return df
37 def get_columns(self):
38 return ["path", "name", "extension"]
41class FileTable(APIResource):
43 def _get_file_df(self):
44 try:
45 df = self.handler._read_file(self.table_name)
46 if df is None:
47 raise Exception(f"No such file found for the path: {self.dropbox_path}")
49 return df
50 except Exception as e:
51 self.handler.logger.error(e)
53 def list(self, conditions=None, limit=None, sort=None, targets=None, **kwargs):
54 return self._get_file_df()
56 def get_columns(self):
57 df = self.handler._read_file(self.table_name)
58 return df.columns.tolist()
60 def insert(self, query: Insert) -> None:
61 columns = [col.name for col in query.columns]
62 data = [dict(zip(columns, row)) for row in query.values]
63 df_new = pd.DataFrame(data)
64 df_existing = self._get_file_df()
65 df_combined = pd.concat([df_existing, df_new], ignore_index=True)
66 self.handler._write_file(self.table_name, df_combined)
69class DropboxHandler(APIHandler):
71 name = "dropbox"
72 supported_file_formats = ["csv", "tsv", "json", "parquet"]
74 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs):
75 super().__init__(name)
76 self.connection_data = connection_data
77 self.kwargs = kwargs
78 self.logger = log.getLogger(__name__)
79 self.dbx = None
80 self.is_connected = False
81 self._files_table = ListFilesTable(self)
82 self._register_table("files", self._files_table)
84 def connect(self):
85 try:
86 if self.is_connected:
87 return
88 if "access_token" not in self.connection_data:
89 raise ValueError("Access token must be provided.")
90 self.dbx = dropbox.Dropbox(self.connection_data["access_token"])
91 self.is_connected = True
92 self.logger.info(
93 f"Connected to Dropbox as {self.dbx.users_get_current_account().email}"
94 )
95 except ValueError as e:
96 self.logger.error(f"Error connecting to Dropbox: {e}")
97 except AuthError as e:
98 self.logger.error(f"Authentication error with Dropbox: {e}")
99 except BadInputError as e:
100 self.logger.error(f"Bad input error with Dropbox: {e}")
101 except Exception as e:
102 self.logger.error(f"Error with Dropbox: {e}")
104 def check_connection(self) -> StatusResponse:
105 response = StatusResponse(False)
106 try:
107 self.connect()
108 response.success = True
109 except (ApiError, ValueError) as e:
110 self.logger.error(f"Error connecting to Dropbox with Dropbox: {e}")
111 response.error_message = str(e)
112 except AuthError as e:
113 self.logger.error(f"Authentication error with Dropbox: {e}")
114 response.error_message = str(e)
115 except Exception as e:
116 self.logger.error(f"Error with Dropbox Handler: {e}")
117 response.error_message = str(e)
118 return response
120 def disconnect(self):
121 if not self.is_connected:
122 return
123 self.dbx = None
124 self.is_connected = False
125 self.logger.info("Disconnected from Dropbox")
127 def _read_as_content(self, file_path) -> None:
128 """
129 Read files as content
130 """
131 try:
132 _, res = self.dbx.files_download(file_path)
133 content = res.content
134 return content
135 except ApiError as e:
136 self.logger.error(f"Error when downloading a file from Dropbox: {e}")
138 def query(self, query: ASTNode) -> Response:
140 if isinstance(query, Select):
141 table_name = query.from_table.parts[-1]
142 if table_name == "files":
143 table = self._files_table
144 df = table.select(query)
146 # add content
147 has_content = False
148 for target in query.targets:
149 if (
150 isinstance(target, Identifier)
151 and target.parts[-1].lower() == "content"
152 ):
153 has_content = True
154 break
155 if has_content:
156 df["content"] = df["path"].apply(self._read_as_content)
157 else:
158 table = FileTable(self, table_name=table_name)
159 df = table.select(query)
161 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
162 elif isinstance(query, Insert):
163 table_name = query.table.parts[-1]
164 table = FileTable(self, table_name=table_name)
165 table.insert(query)
166 return Response(RESPONSE_TYPE.OK)
167 else:
168 raise NotImplementedError(
169 "Only SELECT and INSERT operations are supported."
170 )
172 def get_tables(self) -> Response:
173 table_names = list(self._tables.keys())
174 df = pd.DataFrame(table_names, columns=["table_name"])
175 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
177 def get_columns(self, table_name: str) -> Response:
178 table = self._get_table(Identifier(table_name))
179 columns = table.get_columns()
180 df = pd.DataFrame(columns, columns=["column_name"])
181 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
183 def _list_files(self, path=""):
184 files = []
185 result = self.dbx.files_list_folder(path, recursive=True)
186 files.extend(self._process_entries(result.entries))
187 while result.has_more:
188 result = self.dbx.files_list_folder_continue(result.cursor)
189 files.extend(self._process_entries(result.entries))
190 return files
192 def _process_entries(self, entries):
193 files = []
194 for entry in entries:
195 if isinstance(entry, dropbox.files.FileMetadata):
196 extension = entry.name.split(".")[-1].lower()
197 if extension in self.supported_file_formats:
198 files.append(
199 {
200 "path": entry.path_lower,
201 "name": entry.name,
202 "extension": extension,
203 }
204 )
205 return files
207 def _read_file(self, path) -> pd.DataFrame:
208 try:
209 _, res = self.dbx.files_download(path)
210 content = res.content
211 extension = path.split(".")[-1].lower()
212 if extension == "csv":
213 df = pd.read_csv(io.BytesIO(content))
214 elif extension == "tsv":
215 df = pd.read_csv(io.BytesIO(content), sep="\t")
216 elif extension == "json":
217 df = pd.read_json(io.BytesIO(content))
218 elif extension == "parquet":
219 df = pd.read_parquet(io.BytesIO(content))
220 else:
221 raise ValueError(f"Unsupported file format: {extension}")
222 return df
223 except ValueError as e:
224 self.logger.error(f"Error with file extension: {e}")
225 except ApiError as e:
226 self.logger.error(f"Error when downloading a file from Dropbox: {e}")
227 except Exception as e:
228 self.logger.error(f"Error with Dropbox Handler: {e}")
230 def _write_file(self, path, df: pd.DataFrame):
231 try:
232 extension = path.split(".")[-1].lower()
233 buffer = io.BytesIO()
234 if extension == "csv":
235 df.to_csv(buffer, index=False)
236 elif extension == "tsv":
237 df.to_csv(buffer, index=False, sep="\t")
238 elif extension == "json":
239 df.to_json(buffer, orient="records")
240 elif extension == "parquet":
241 df.to_parquet(buffer, index=False)
242 else:
243 raise ValueError(f"Unsupported file format: {extension}")
244 buffer.seek(0)
245 self.dbx.files_upload(
246 buffer.read(), path, mode=dropbox.files.WriteMode.overwrite
247 )
248 except ValueError as e:
249 self.logger.error(f"Error with file extension: {e}")
250 except ApiError as e:
251 self.logger.error(f"Error when writing a file to Dropbox: {e}")
252 except Exception as e:
253 self.logger.error(f"Error with Dropbox Handler: {e}")