Coverage for mindsdb / api / executor / datahub / datanodes / integration_datanode.py: 58%
179 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2import inspect
3import functools
4from dataclasses import astuple
5from typing import Iterable, List
7import numpy as np
8import pandas as pd
9from sqlalchemy.types import Integer, Float
11from mindsdb_sql_parser.ast.base import ASTNode
12from mindsdb_sql_parser.ast import Insert, Identifier, CreateTable, TableColumn, DropTables
14from mindsdb.api.executor.datahub.classes.response import DataHubResponse
15from mindsdb.api.executor.datahub.datanodes.datanode import DataNode
16from mindsdb.api.executor.datahub.classes.tables_row import TablesRow
17from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
18from mindsdb.api.executor.sql_query.result_set import ResultSet
19from mindsdb.integrations.libs.response import HandlerResponse, INF_SCHEMA_COLUMNS_NAMES
20from mindsdb.integrations.utilities.utils import get_class_name
21from mindsdb.metrics import metrics
22from mindsdb.utilities import log
23from mindsdb.utilities.profiler import profiler
24from mindsdb.utilities.exception import QueryError
25from mindsdb.api.executor.datahub.datanodes.system_tables import infer_mysql_type
27logger = log.getLogger(__name__)
30class DBHandlerException(Exception):
31 pass
34def collect_metrics(func):
35 """Decorator for collecting performance metrics if integration handler query.
37 The decorator measures:
38 - Query execution time using high-precision performance counter
39 - Response size (number of rows returned)
41 Args:
42 func: The function to be decorated (integration handler method)
44 Returns:
45 function: Wrapped function that includes metrics collection and error handling
46 """
48 @functools.wraps(func)
49 def wrapper(self, *args, **kwargs):
50 try:
51 time_before_query = time.perf_counter()
52 result = func(self, *args, **kwargs)
54 # metrics
55 handler_class_name = get_class_name(self.integration_handler)
56 elapsed_seconds = time.perf_counter() - time_before_query
57 query_time_with_labels = metrics.INTEGRATION_HANDLER_QUERY_TIME.labels(handler_class_name, result.type)
58 query_time_with_labels.observe(elapsed_seconds)
60 num_rows = 0
61 if result.data_frame is not None:
62 num_rows = len(result.data_frame.index)
63 response_size_with_labels = metrics.INTEGRATION_HANDLER_RESPONSE_SIZE.labels(
64 handler_class_name, result.type
65 )
66 response_size_with_labels.observe(num_rows)
67 logger.debug(f"Handler '{handler_class_name}' returned {num_rows} rows in {elapsed_seconds:.3f} seconds")
68 except Exception as e:
69 msg = str(e).strip()
70 if msg == "":
71 msg = e.__class__.__name__
72 msg = f"[{self.ds_type}/{self.integration_name}]: {msg}"
73 raise DBHandlerException(msg) from e
74 return result
76 return wrapper
79class IntegrationDataNode(DataNode):
80 type = "integration"
82 def __init__(self, integration_name, ds_type, integration_controller):
83 self.integration_name = integration_name
84 self.ds_type = ds_type
85 self.integration_controller = integration_controller
86 self.integration_handler = self.integration_controller.get_data_handler(self.integration_name)
88 def get_type(self):
89 return self.type
91 def get_tables(self):
92 response = self.integration_handler.get_tables()
93 if response.type == RESPONSE_TYPE.TABLE: 93 ↛ 97line 93 didn't jump to line 97 because the condition on line 93 was always true
94 result_dict = response.data_frame.to_dict(orient="records")
95 return [TablesRow.from_dict(row) for row in result_dict]
96 else:
97 raise Exception(f"Can't get tables: {response.error_message}")
99 def get_table_columns_df(self, table_name: str, schema_name: str | None = None) -> pd.DataFrame:
100 """Get a DataFrame containing representation of information_schema.columns for the specified table.
102 Args:
103 table_name (str): The name of the table to get columns from.
104 schema_name (str | None): The name of the schema to get columns from.
106 Returns:
107 pd.DataFrame: A DataFrame containing representation of information_schema.columns for the specified table.
108 The DataFrame has list of columns as in the integrations.libs.response.INF_SCHEMA_COLUMNS_NAMES.
109 """
110 if "schema_name" in inspect.signature(self.integration_handler.get_columns).parameters:
111 response = self.integration_handler.get_columns(table_name, schema_name)
112 else:
113 response = self.integration_handler.get_columns(table_name)
115 if response.type == RESPONSE_TYPE.COLUMNS_TABLE:
116 return response.data_frame
118 if response.type != RESPONSE_TYPE.TABLE:
119 logger.warning(f"Wrong response type for handler's `get_columns` call: {response.type}")
120 return pd.DataFrame([], columns=astuple(INF_SCHEMA_COLUMNS_NAMES))
122 # region fallback for old handlers
123 df = response.data_frame
124 df.columns = [name.upper() for name in df.columns]
125 if "FIELD" not in df.columns or "TYPE" not in df.columns:
126 logger.warning(
127 f"Response from the handler's `get_columns` call does not contain required columns: {list(df.columns)}"
128 )
129 return pd.DataFrame([], columns=astuple(INF_SCHEMA_COLUMNS_NAMES))
131 new_df = df[["FIELD", "TYPE"]]
132 new_df.columns = ["COLUMN_NAME", "DATA_TYPE"]
134 new_df[INF_SCHEMA_COLUMNS_NAMES.MYSQL_DATA_TYPE] = new_df[INF_SCHEMA_COLUMNS_NAMES.DATA_TYPE].apply(
135 lambda x: infer_mysql_type(x).value
136 )
138 for column_name in astuple(INF_SCHEMA_COLUMNS_NAMES):
139 if column_name in new_df.columns:
140 continue
141 new_df[column_name] = None
142 # endregion
144 return new_df
146 def get_table_columns_names(self, table_name: str, schema_name: str | None = None) -> list[str]:
147 """Get a list of column names for the specified table.
149 Args:
150 table_name (str): The name of the table to get columns from.
151 schema_name (str | None): The name of the schema to get columns from.
153 Returns:
154 list[str]: A list of column names for the specified table.
155 """
156 df = self.get_table_columns_df(table_name, schema_name)
157 return df[INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME].to_list()
159 def drop_table(self, name: Identifier, if_exists=False):
160 drop_ast = DropTables(tables=[name], if_exists=if_exists)
161 self.query(drop_ast)
163 def create_table(
164 self,
165 table_name: Identifier,
166 result_set: ResultSet = None,
167 columns: List[TableColumn] = None,
168 is_replace: bool = False,
169 is_create: bool = False,
170 raise_if_exists: bool = True,
171 **kwargs,
172 ) -> DataHubResponse:
173 # is_create - create table
174 # if !raise_if_exists: error will be skipped
175 # is_replace - drop table if exists
176 # is_create==False and is_replace==False: just insert
178 table_columns_meta = {}
180 if columns is None: 180 ↛ 184line 180 didn't jump to line 184 because the condition on line 180 was always true
181 columns: list[TableColumn] = result_set.get_ast_columns()
182 table_columns_meta = {column.name: column.type for column in columns}
184 if is_replace:
185 # drop
186 drop_ast = DropTables(tables=[table_name], if_exists=True)
187 self.query(drop_ast)
188 is_create = True
190 if is_create: 190 ↛ 198line 190 didn't jump to line 198 because the condition on line 190 was always true
191 create_table_ast = CreateTable(name=table_name, columns=columns, is_replace=is_replace)
192 try:
193 self.query(create_table_ast)
194 except Exception as e:
195 if raise_if_exists:
196 raise e
198 if result_set is None: 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was never true
199 # it is just a 'create table'
200 return DataHubResponse()
202 # native insert
203 if hasattr(self.integration_handler, "insert"): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 df = result_set.to_df()
206 result: HandlerResponse = self.integration_handler.insert(table_name.parts[-1], df)
207 if result is not None:
208 affected_rows = result.affected_rows
209 else:
210 affected_rows = None
211 return DataHubResponse(affected_rows=affected_rows)
213 insert_columns = [Identifier(parts=[x.alias]) for x in result_set.columns]
215 # adapt table types
216 for col_idx, col in enumerate(result_set.columns):
217 column_type = table_columns_meta[col.alias]
219 if column_type == Integer: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 type_name = "int"
221 elif column_type == Float: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 type_name = "float"
223 else:
224 continue
226 try:
227 result_set.set_col_type(col_idx, type_name)
228 except Exception:
229 pass
231 values = result_set.to_lists()
233 if len(values) == 0: 233 ↛ 235line 233 didn't jump to line 235 because the condition on line 233 was never true
234 # not need to insert
235 return DataHubResponse()
237 insert_ast = Insert(table=table_name, columns=insert_columns, values=values, is_plain=True)
239 try:
240 result: DataHubResponse = self.query(insert_ast)
241 except Exception as e:
242 msg = f"[{self.ds_type}/{self.integration_name}]: {str(e)}"
243 raise DBHandlerException(msg) from e
245 return DataHubResponse(affected_rows=result.affected_rows)
247 def has_support_stream(self) -> bool:
248 # checks if data handler has query_stream method
249 return hasattr(self.integration_handler, "query_stream") and callable(self.integration_handler.query_stream)
251 @profiler.profile()
252 def query_stream(self, query: ASTNode, fetch_size: int = None) -> Iterable:
253 # returns generator of results from handler (split by chunks)
254 return self.integration_handler.query_stream(query, fetch_size=fetch_size)
256 @profiler.profile()
257 def query(self, query: ASTNode | str = None, session=None) -> DataHubResponse:
258 """Execute a query against the integration data source.
260 This method processes SQL queries either as ASTNode objects or raw SQL strings
262 Args:
263 query (ASTNode | str, optional): The query to execute. Can be either:
264 - ASTNode: A parsed SQL query object
265 - str: Raw SQL query string
266 session: Session object (currently unused but kept for compatibility)
268 Returns:
269 DataHubResponse: Response object
271 Raises:
272 NotImplementedError: If query is not ASTNode or str type
273 Exception: If the query execution fails with an error response
274 """
275 if isinstance(query, ASTNode): 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true
276 result: HandlerResponse = self.query_integration_handler(query=query)
277 elif isinstance(query, str):
278 result: HandlerResponse = self.native_query_integration(query=query)
279 else:
280 raise NotImplementedError("Thew query argument must be ASTNode or string type")
282 if result.type == RESPONSE_TYPE.ERROR:
283 if isinstance(query, ASTNode): 283 ↛ 290line 283 didn't jump to line 290 because the condition on line 283 was always true
284 try:
285 query_str = query.to_string()
286 except Exception:
287 # most likely it is CreateTable with exotic column types
288 query_str = "can't be dump"
289 else:
290 query_str = query
292 exception = QueryError(
293 db_name=self.integration_handler.name,
294 db_type=self.integration_handler.__class__.name,
295 db_error_msg=result.error_message,
296 failed_query=query_str,
297 is_expected=result.is_expected_error,
298 )
300 if result.exception is None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 raise exception
302 else:
303 raise exception from result.exception
305 if result.type == RESPONSE_TYPE.OK:
306 return DataHubResponse(affected_rows=result.affected_rows)
308 df = result.data_frame
309 # region clearing df from NaN values
310 # recursion error appears in pandas 1.5.3 https://github.com/pandas-dev/pandas/pull/45749
311 if isinstance(df, pd.Series): 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 df = df.to_frame()
314 columns_info = [{"name": k, "type": v} for k, v in df.dtypes.items()]
315 try:
316 # replace python's Nan, np.nan and pd.NA to None
317 # TODO keep all NAN to the end of processing, bacause replacing also changes dtypes
318 df.replace([np.nan, pd.NA, pd.NaT], None, inplace=True)
319 except Exception:
320 logger.exception("Issue with clearing DF from NaN values:")
321 # endregion
323 return DataHubResponse(
324 data_frame=df, columns=columns_info, affected_rows=result.affected_rows, mysql_types=result.mysql_types
325 )
327 @collect_metrics
328 def query_integration_handler(self, query: ASTNode) -> HandlerResponse:
329 return self.integration_handler.query(query)
331 @collect_metrics
332 def native_query_integration(self, query: str) -> HandlerResponse:
333 return self.integration_handler.native_query(query)