Coverage for mindsdb / integrations / handlers / file_handler / file_handler.py: 83%
117 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import shutil
3import tempfile
5import pandas as pd
6from mindsdb_sql_parser import parse_sql
7from mindsdb_sql_parser.ast import CreateTable, DropTables, Insert, Select, Identifier
8from mindsdb_sql_parser.ast.base import ASTNode
10from mindsdb.api.executor.utilities.sql import query_dfs
11from mindsdb.integrations.libs.base import DatabaseHandler
12from mindsdb.integrations.libs.response import RESPONSE_TYPE
13from mindsdb.integrations.libs.response import HandlerResponse as Response
14from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse
15from mindsdb.utilities import log
18logger = log.getLogger(__name__)
20DEFAULT_CHUNK_SIZE = 500
21DEFAULT_CHUNK_OVERLAP = 250
24def clean_cell(val):
25 if str(val) in ["", " ", " ", "NaN", "nan", "NA"]:
26 return None
27 return val
30class FileHandler(DatabaseHandler):
31 """
32 Handler for files
33 """
35 name = "files"
37 def __init__(
38 self,
39 name=None,
40 file_storage=None,
41 connection_data={},
42 file_controller=None,
43 **kwargs,
44 ):
45 super().__init__(name)
46 self.parser = parse_sql
47 self.fs_store = file_storage
48 self.custom_parser = connection_data.get("custom_parser", None)
49 self.clean_rows = connection_data.get("clean_rows", True)
50 self.chunk_size = connection_data.get("chunk_size", DEFAULT_CHUNK_SIZE)
51 self.chunk_overlap = connection_data.get("chunk_overlap", DEFAULT_CHUNK_OVERLAP)
52 self.file_controller = file_controller
53 self.cache_thread_safe = True
55 def connect(self, **kwargs):
56 return
58 def disconnect(self, **kwargs):
59 return
61 def check_connection(self) -> StatusResponse:
62 return StatusResponse(True)
64 def _get_table_page_names(self, table: Identifier):
65 table_name_parts = table.parts
67 # Check if it's a multi-part name (e.g., `file_name.sheet_name`)
68 if len(table_name_parts) > 1:
69 table_name = table_name_parts[-2]
70 page_name = table_name_parts[-1] # Get the sheet name
71 else:
72 table_name = table_name_parts[-1]
73 page_name = None
74 return table_name, page_name
76 def query(self, query: ASTNode) -> Response:
77 if type(query) is DropTables:
78 for table_identifier in query.tables:
79 if len(table_identifier.parts) == 2 and table_identifier.parts[0] != self.name:
80 return Response(
81 RESPONSE_TYPE.ERROR,
82 error_message=f"Can't delete table from database '{table_identifier.parts[0]}'",
83 )
84 table_name = table_identifier.parts[-1]
85 try:
86 self.file_controller.delete_file(table_name)
87 except FileNotFoundError as e:
88 if not query.if_exists:
89 return Response(
90 RESPONSE_TYPE.ERROR,
91 error_message=f"Can't delete table '{table_name}': {e}",
92 )
93 except Exception as e:
94 return Response(
95 RESPONSE_TYPE.ERROR,
96 error_message=f"Can't delete table '{table_name}': {e}",
97 )
98 return Response(RESPONSE_TYPE.OK)
100 if isinstance(query, CreateTable):
101 # Check if the table already exists or if the table name contains more than one namespace
102 existing_files = self.file_controller.get_files_names()
104 if len(query.name.parts) != 1:
105 return Response(
106 RESPONSE_TYPE.ERROR,
107 error_message="Table name cannot contain more than one namespace",
108 )
110 table_name = query.name.parts[-1]
111 if table_name in existing_files:
112 if query.is_replace: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 self.file_controller.delete_file(table_name)
114 else:
115 return Response(
116 RESPONSE_TYPE.ERROR,
117 error_message=f"Table '{table_name}' already exists",
118 )
120 temp_dir_path = tempfile.mkdtemp(prefix="mindsdb_file_")
122 try:
123 # Create a temp file to save the table
124 temp_file_path = os.path.join(temp_dir_path, f"{table_name}.csv")
126 # Create an empty file using with the columns in the query
127 df = pd.DataFrame(columns=[col.name for col in query.columns])
128 df.to_csv(temp_file_path, index=False)
130 self.file_controller.save_file(table_name, temp_file_path, file_name=f"{table_name}.csv")
131 except Exception as unknown_error:
132 return Response(
133 RESPONSE_TYPE.ERROR,
134 error_message=f"Error creating table '{table_name}': {unknown_error}",
135 )
136 finally:
137 # Remove the temp dir created
138 shutil.rmtree(temp_dir_path, ignore_errors=True)
140 return Response(RESPONSE_TYPE.OK)
142 elif isinstance(query, Select):
143 from mindsdb.integrations.utilities.query_traversal import query_traversal
145 tables = {}
147 def find_tables(node, is_table, **args):
148 if is_table and isinstance(node, Identifier):
149 table_name, page_name = self._get_table_page_names(node)
150 try:
151 df = self.file_controller.get_file_data(table_name, page_name)
152 except FileNotFoundError:
153 return
155 if page_name is not None:
156 table_name = f"{page_name}_{table_name}"
157 node.parts = [table_name]
158 tables[table_name] = df
160 query_traversal(query, find_tables)
162 if len(tables) == 0: 162 ↛ 166line 162 didn't jump to line 166 because the condition on line 162 was always true
163 raise RuntimeError(f"No tables in query: {query}")
165 # Process the SELECT query
166 result_df = query_dfs(tables, query)
167 return Response(RESPONSE_TYPE.TABLE, data_frame=result_df)
169 elif isinstance(query, Insert):
170 table_name, page_name = self._get_table_page_names(query.table)
172 df = self.file_controller.get_file_data(table_name, page_name)
174 # Create a new dataframe with the values from the query
175 new_df = pd.DataFrame(query.values, columns=[col.name for col in query.columns])
177 # Concatenate the new dataframe with the existing one
178 df = pd.concat([df, new_df], ignore_index=True)
180 self.file_controller.set_file_data(table_name, df, page_name=page_name)
182 return Response(RESPONSE_TYPE.OK)
184 else:
185 return Response(
186 RESPONSE_TYPE.ERROR,
187 error_message="Only 'select', 'insert', 'create' and 'drop' queries allowed for files",
188 )
190 def native_query(self, query: str) -> Response:
191 ast = self.parser(query)
192 return self.query(ast)
194 def get_tables(self) -> Response:
195 """
196 List all files
197 """
198 files_meta = self.file_controller.get_files()
199 data = [
200 {
201 "TABLE_NAME": x["name"],
202 "TABLE_ROWS": x["row_count"],
203 "TABLE_TYPE": "BASE TABLE",
204 }
205 for x in files_meta
206 ]
207 return Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(data))
209 def get_columns(self, table_name) -> Response:
210 file_meta = self.file_controller.get_file_meta(table_name)
211 result = Response(
212 RESPONSE_TYPE.TABLE,
213 data_frame=pd.DataFrame(
214 [
215 {
216 "Field": x["name"].strip() if isinstance(x, dict) else x.strip(),
217 "Type": "str",
218 }
219 for x in file_meta["columns"]
220 ]
221 ),
222 )
223 return result