Coverage for mindsdb / integrations / handlers / box_handler / box_handler.py: 0%
191 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import io
2import pandas as pd
3from box_sdk_gen import BoxClient, BoxDeveloperTokenAuth, CreateFolderParent, UploadFileAttributes, UploadFileAttributesParentField
4from box_sdk_gen.internal import utils
5from typing import Dict, Optional, Text
7from mindsdb_sql_parser.ast.base import ASTNode
8from mindsdb_sql_parser.ast import Select, Identifier, Insert
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE,
15)
16from mindsdb.integrations.libs.api_handler import APIHandler, APIResource
19class ListFilesTable(APIResource):
21 def list(self, conditions=None, limit=None, sort=None, targets=None, **kwargs):
22 files = self.handler._list_files()
23 data = []
24 for file in files:
25 item = {
26 "path": file["path"],
27 "name": file["name"],
28 "extension": file["extension"],
29 }
30 data.append(item)
31 df = pd.DataFrame(data)
32 return df
34 def get_columns(self):
35 return ["path", "name", "extension"]
38class FileTable(APIResource):
40 def _get_file_df(self):
41 try:
42 df = self.handler._read_file(self.table_name)
43 if df is None:
44 raise Exception(f"No such file found for the path: {self.box_path}")
46 return df
47 except Exception as e:
48 self.handler.logger.error(e)
50 def list(self, conditions=None, limit=None, sort=None, targets=None, **kwargs):
51 return self._get_file_df()
53 def get_columns(self):
54 df = self.handler._read_file(self.table_name)
55 return df.columns.tolist()
57 def insert(self, query: Insert) -> None:
58 columns = [col.name for col in query.columns]
59 data = [dict(zip(columns, row)) for row in query.values]
60 df_new = pd.DataFrame(data)
61 df_existing = self._get_file_df()
62 df_combined = pd.concat([df_existing, df_new], ignore_index=True)
63 self.handler._write_file(self.table_name, df_combined)
66class BoxHandler(APIHandler):
68 name = "box"
69 supported_file_formats = ["csv", "tsv", "json", "parquet"]
71 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs):
72 super().__init__(name)
73 self.connection_data = connection_data
74 self.kwargs = kwargs
75 self.logger = log.getLogger(__name__)
76 self.client = None
77 self.is_connected = False
78 self._files_table = ListFilesTable(self)
79 self._register_table("files", self._files_table)
81 def connect(self):
82 try:
83 if self.is_connected:
84 return
85 if "token" not in self.connection_data:
86 raise ValueError("Developer token must be provided.")
87 auth = BoxDeveloperTokenAuth(token=self.connection_data["token"])
88 self.client = BoxClient(auth=auth)
89 self.client.folders.get_folder_items("0", limit=1)
90 self.is_connected = True
91 self.logger.info("Connected to Box")
92 except Exception as e:
93 self.logger.error(f"Error connecting to Box: {e}")
95 def check_connection(self) -> StatusResponse:
96 response = StatusResponse(False)
97 try:
98 self.connect()
99 response.success = True
100 except Exception as e:
101 self.logger.error(f"Error with Box Handler while establish connection: {e}")
102 response.error_message = str(e)
103 return response
105 def disconnect(self):
106 if not self.is_connected:
107 return
108 self.client = None
109 self.is_connected = False
110 self.logger.info("Disconnected from Box")
112 def _read_as_content(self, file_path) -> utils.Buffer:
113 """
114 Read files as content
115 """
116 try:
117 id = "0" # Root folder id. The value is always "0".
119 items = file_path.strip('/').split('/')
121 for item in items:
122 item_id = None
123 for entry in self.client.folders.get_folder_items(id).entries:
124 if entry.name == item:
125 item_id = entry.id
126 break
127 if item_id:
128 id = item_id
129 else:
130 raise ValueError(f"{item} not found. Specify the correct path.")
132 downloaded_file_content = self.client.downloads.download_file(id)
133 buffer = utils.read_byte_stream(downloaded_file_content)
134 return buffer
135 except Exception as e:
136 self.logger.error(f"Error when downloading a file from Box: {e}")
138 def query(self, query: ASTNode) -> Response:
140 if isinstance(query, Select):
141 table_name = query.from_table.parts[-1]
142 if table_name == "files":
143 table = self._files_table
144 df = table.select(query)
145 has_content = False
146 for target in query.targets:
147 if (
148 isinstance(target, Identifier)
149 and target.parts[-1].lower() == "content"
150 ):
151 has_content = True
152 break
153 if has_content:
154 df["content"] = df["path"].apply(self._read_as_content)
155 else:
156 table = FileTable(self, table_name=table_name)
157 df = table.select(query)
159 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
160 elif isinstance(query, Insert):
161 table_name = query.table.parts[-1]
162 table = FileTable(self, table_name=table_name)
163 table.insert(query)
164 return Response(RESPONSE_TYPE.OK)
165 else:
166 raise NotImplementedError(
167 "Only SELECT and INSERT operations are supported."
168 )
170 def get_tables(self) -> Response:
171 table_names = list(self._tables.keys())
172 df = pd.DataFrame(table_names, columns=["table_name"])
173 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
175 def get_columns(self, table_name: str) -> Response:
176 table = self._get_table(Identifier(table_name))
177 columns = table.get_columns()
178 df = pd.DataFrame(columns, columns=["column_name"])
179 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
181 def list_files_in_folder(self, folder_id, files=[]):
182 items = self.client.folders.get_folder_items(folder_id).entries
183 for item in items:
184 if item.type == 'folder':
185 self.list_files_in_folder(item.id, files)
186 elif item.type == 'file':
187 extension = item.name.split(".")[-1].lower()
188 if extension in self.supported_file_formats:
189 file = self.client.files.get_file_by_id(item.id)
190 full_path = [path.name for path in file.path_collection.entries][1:]
191 full_path.append(item.name)
192 files.append(
193 {
194 "path": "/".join(full_path),
195 "name": item.name,
196 "extension": extension,
197 }
198 )
200 return files
202 def _list_files(self, path=""):
203 return self.list_files_in_folder("0")
205 def _read_file(self, path) -> pd.DataFrame:
206 try:
207 buffer = self._read_as_content(path)
208 extension = path.split(".")[-1].lower()
209 if extension == "csv":
210 df = pd.read_csv(io.BytesIO(buffer))
211 elif extension == "tsv":
212 df = pd.read_csv(io.BytesIO(buffer), sep="\t")
213 elif extension == "json":
214 df = pd.read_json(io.BytesIO(buffer))
215 elif extension == "parquet":
216 df = pd.read_parquet(io.BytesIO(buffer))
217 else:
218 raise ValueError(f"Unsupported file format: {extension}")
219 return df
220 except Exception as e:
221 self.logger.error(f"Error with Box Handler while reading file: {e}")
223 def upload_file(self, path, buffer):
224 parent_folder_id = "0"
226 directories = path.strip('/').split('/')
228 for direc in directories[:-1]:
229 sub_folder_id = None
230 for item in self.client.folders.get_folder_items(parent_folder_id).entries:
231 if item.type == "folder" and item.name == direc:
232 sub_folder_id = item.id
233 break
234 if sub_folder_id:
235 parent_folder_id = sub_folder_id
236 else:
237 new_sub_folder = self.client.folders.create_folder(direc, CreateFolderParent(id=parent_folder_id))
238 parent_folder_id = new_sub_folder.id
239 self.logger.debug(f"folder {direc} not available in path. Creating {direc}...")
241 self.client.uploads.upload_file(UploadFileAttributes(name=directories[-1], parent=UploadFileAttributesParentField(id=parent_folder_id)), buffer)
243 def _write_file(self, path, df: pd.DataFrame):
244 extension = path.split(".")[-1].lower()
245 buffer = io.BytesIO()
246 if extension == "csv":
247 df.to_csv(buffer, index=False)
248 elif extension == "tsv":
249 df.to_csv(buffer, index=False, sep="\t")
250 elif extension == "json":
251 df.to_json(buffer, orient="records")
252 elif extension == "parquet":
253 df.to_parquet(buffer, index=False)
254 else:
255 raise ValueError(f"Unsupported file format: {extension}")
256 buffer.seek(0)
257 self.upload_file(path, buffer)