Coverage for mindsdb / api / executor / datahub / datanodes / integration_datanode.py: 58%

179 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2import inspect 

3import functools 

4from dataclasses import astuple 

5from typing import Iterable, List 

6 

7import numpy as np 

8import pandas as pd 

9from sqlalchemy.types import Integer, Float 

10 

11from mindsdb_sql_parser.ast.base import ASTNode 

12from mindsdb_sql_parser.ast import Insert, Identifier, CreateTable, TableColumn, DropTables 

13 

14from mindsdb.api.executor.datahub.classes.response import DataHubResponse 

15from mindsdb.api.executor.datahub.datanodes.datanode import DataNode 

16from mindsdb.api.executor.datahub.classes.tables_row import TablesRow 

17from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE 

18from mindsdb.api.executor.sql_query.result_set import ResultSet 

19from mindsdb.integrations.libs.response import HandlerResponse, INF_SCHEMA_COLUMNS_NAMES 

20from mindsdb.integrations.utilities.utils import get_class_name 

21from mindsdb.metrics import metrics 

22from mindsdb.utilities import log 

23from mindsdb.utilities.profiler import profiler 

24from mindsdb.utilities.exception import QueryError 

25from mindsdb.api.executor.datahub.datanodes.system_tables import infer_mysql_type 

26 

27logger = log.getLogger(__name__) 

28 

29 

30class DBHandlerException(Exception): 

31 pass 

32 

33 

34def collect_metrics(func): 

35 """Decorator for collecting performance metrics if integration handler query. 

36 

37 The decorator measures: 

38 - Query execution time using high-precision performance counter 

39 - Response size (number of rows returned) 

40 

41 Args: 

42 func: The function to be decorated (integration handler method) 

43 

44 Returns: 

45 function: Wrapped function that includes metrics collection and error handling 

46 """ 

47 

48 @functools.wraps(func) 

49 def wrapper(self, *args, **kwargs): 

50 try: 

51 time_before_query = time.perf_counter() 

52 result = func(self, *args, **kwargs) 

53 

54 # metrics 

55 handler_class_name = get_class_name(self.integration_handler) 

56 elapsed_seconds = time.perf_counter() - time_before_query 

57 query_time_with_labels = metrics.INTEGRATION_HANDLER_QUERY_TIME.labels(handler_class_name, result.type) 

58 query_time_with_labels.observe(elapsed_seconds) 

59 

60 num_rows = 0 

61 if result.data_frame is not None: 

62 num_rows = len(result.data_frame.index) 

63 response_size_with_labels = metrics.INTEGRATION_HANDLER_RESPONSE_SIZE.labels( 

64 handler_class_name, result.type 

65 ) 

66 response_size_with_labels.observe(num_rows) 

67 logger.debug(f"Handler '{handler_class_name}' returned {num_rows} rows in {elapsed_seconds:.3f} seconds") 

68 except Exception as e: 

69 msg = str(e).strip() 

70 if msg == "": 

71 msg = e.__class__.__name__ 

72 msg = f"[{self.ds_type}/{self.integration_name}]: {msg}" 

73 raise DBHandlerException(msg) from e 

74 return result 

75 

76 return wrapper 

77 

78 

79class IntegrationDataNode(DataNode): 

80 type = "integration" 

81 

82 def __init__(self, integration_name, ds_type, integration_controller): 

83 self.integration_name = integration_name 

84 self.ds_type = ds_type 

85 self.integration_controller = integration_controller 

86 self.integration_handler = self.integration_controller.get_data_handler(self.integration_name) 

87 

88 def get_type(self): 

89 return self.type 

90 

91 def get_tables(self): 

92 response = self.integration_handler.get_tables() 

93 if response.type == RESPONSE_TYPE.TABLE: 93 ↛ 97line 93 didn't jump to line 97 because the condition on line 93 was always true

94 result_dict = response.data_frame.to_dict(orient="records") 

95 return [TablesRow.from_dict(row) for row in result_dict] 

96 else: 

97 raise Exception(f"Can't get tables: {response.error_message}") 

98 

99 def get_table_columns_df(self, table_name: str, schema_name: str | None = None) -> pd.DataFrame: 

100 """Get a DataFrame containing representation of information_schema.columns for the specified table. 

101 

102 Args: 

103 table_name (str): The name of the table to get columns from. 

104 schema_name (str | None): The name of the schema to get columns from. 

105 

106 Returns: 

107 pd.DataFrame: A DataFrame containing representation of information_schema.columns for the specified table. 

108 The DataFrame has list of columns as in the integrations.libs.response.INF_SCHEMA_COLUMNS_NAMES. 

109 """ 

110 if "schema_name" in inspect.signature(self.integration_handler.get_columns).parameters: 

111 response = self.integration_handler.get_columns(table_name, schema_name) 

112 else: 

113 response = self.integration_handler.get_columns(table_name) 

114 

115 if response.type == RESPONSE_TYPE.COLUMNS_TABLE: 

116 return response.data_frame 

117 

118 if response.type != RESPONSE_TYPE.TABLE: 

119 logger.warning(f"Wrong response type for handler's `get_columns` call: {response.type}") 

120 return pd.DataFrame([], columns=astuple(INF_SCHEMA_COLUMNS_NAMES)) 

121 

122 # region fallback for old handlers 

123 df = response.data_frame 

124 df.columns = [name.upper() for name in df.columns] 

125 if "FIELD" not in df.columns or "TYPE" not in df.columns: 

126 logger.warning( 

127 f"Response from the handler's `get_columns` call does not contain required columns: {list(df.columns)}" 

128 ) 

129 return pd.DataFrame([], columns=astuple(INF_SCHEMA_COLUMNS_NAMES)) 

130 

131 new_df = df[["FIELD", "TYPE"]] 

132 new_df.columns = ["COLUMN_NAME", "DATA_TYPE"] 

133 

134 new_df[INF_SCHEMA_COLUMNS_NAMES.MYSQL_DATA_TYPE] = new_df[INF_SCHEMA_COLUMNS_NAMES.DATA_TYPE].apply( 

135 lambda x: infer_mysql_type(x).value 

136 ) 

137 

138 for column_name in astuple(INF_SCHEMA_COLUMNS_NAMES): 

139 if column_name in new_df.columns: 

140 continue 

141 new_df[column_name] = None 

142 # endregion 

143 

144 return new_df 

145 

146 def get_table_columns_names(self, table_name: str, schema_name: str | None = None) -> list[str]: 

147 """Get a list of column names for the specified table. 

148 

149 Args: 

150 table_name (str): The name of the table to get columns from. 

151 schema_name (str | None): The name of the schema to get columns from. 

152 

153 Returns: 

154 list[str]: A list of column names for the specified table. 

155 """ 

156 df = self.get_table_columns_df(table_name, schema_name) 

157 return df[INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME].to_list() 

158 

159 def drop_table(self, name: Identifier, if_exists=False): 

160 drop_ast = DropTables(tables=[name], if_exists=if_exists) 

161 self.query(drop_ast) 

162 

163 def create_table( 

164 self, 

165 table_name: Identifier, 

166 result_set: ResultSet = None, 

167 columns: List[TableColumn] = None, 

168 is_replace: bool = False, 

169 is_create: bool = False, 

170 raise_if_exists: bool = True, 

171 **kwargs, 

172 ) -> DataHubResponse: 

173 # is_create - create table 

174 # if !raise_if_exists: error will be skipped 

175 # is_replace - drop table if exists 

176 # is_create==False and is_replace==False: just insert 

177 

178 table_columns_meta = {} 

179 

180 if columns is None: 180 ↛ 184line 180 didn't jump to line 184 because the condition on line 180 was always true

181 columns: list[TableColumn] = result_set.get_ast_columns() 

182 table_columns_meta = {column.name: column.type for column in columns} 

183 

184 if is_replace: 

185 # drop 

186 drop_ast = DropTables(tables=[table_name], if_exists=True) 

187 self.query(drop_ast) 

188 is_create = True 

189 

190 if is_create: 190 ↛ 198line 190 didn't jump to line 198 because the condition on line 190 was always true

191 create_table_ast = CreateTable(name=table_name, columns=columns, is_replace=is_replace) 

192 try: 

193 self.query(create_table_ast) 

194 except Exception as e: 

195 if raise_if_exists: 

196 raise e 

197 

198 if result_set is None: 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was never true

199 # it is just a 'create table' 

200 return DataHubResponse() 

201 

202 # native insert 

203 if hasattr(self.integration_handler, "insert"): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 df = result_set.to_df() 

205 

206 result: HandlerResponse = self.integration_handler.insert(table_name.parts[-1], df) 

207 if result is not None: 

208 affected_rows = result.affected_rows 

209 else: 

210 affected_rows = None 

211 return DataHubResponse(affected_rows=affected_rows) 

212 

213 insert_columns = [Identifier(parts=[x.alias]) for x in result_set.columns] 

214 

215 # adapt table types 

216 for col_idx, col in enumerate(result_set.columns): 

217 column_type = table_columns_meta[col.alias] 

218 

219 if column_type == Integer: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 type_name = "int" 

221 elif column_type == Float: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 type_name = "float" 

223 else: 

224 continue 

225 

226 try: 

227 result_set.set_col_type(col_idx, type_name) 

228 except Exception: 

229 pass 

230 

231 values = result_set.to_lists() 

232 

233 if len(values) == 0: 233 ↛ 235line 233 didn't jump to line 235 because the condition on line 233 was never true

234 # not need to insert 

235 return DataHubResponse() 

236 

237 insert_ast = Insert(table=table_name, columns=insert_columns, values=values, is_plain=True) 

238 

239 try: 

240 result: DataHubResponse = self.query(insert_ast) 

241 except Exception as e: 

242 msg = f"[{self.ds_type}/{self.integration_name}]: {str(e)}" 

243 raise DBHandlerException(msg) from e 

244 

245 return DataHubResponse(affected_rows=result.affected_rows) 

246 

247 def has_support_stream(self) -> bool: 

248 # checks if data handler has query_stream method 

249 return hasattr(self.integration_handler, "query_stream") and callable(self.integration_handler.query_stream) 

250 

251 @profiler.profile() 

252 def query_stream(self, query: ASTNode, fetch_size: int = None) -> Iterable: 

253 # returns generator of results from handler (split by chunks) 

254 return self.integration_handler.query_stream(query, fetch_size=fetch_size) 

255 

256 @profiler.profile() 

257 def query(self, query: ASTNode | str = None, session=None) -> DataHubResponse: 

258 """Execute a query against the integration data source. 

259 

260 This method processes SQL queries either as ASTNode objects or raw SQL strings 

261 

262 Args: 

263 query (ASTNode | str, optional): The query to execute. Can be either: 

264 - ASTNode: A parsed SQL query object 

265 - str: Raw SQL query string 

266 session: Session object (currently unused but kept for compatibility) 

267 

268 Returns: 

269 DataHubResponse: Response object 

270 

271 Raises: 

272 NotImplementedError: If query is not ASTNode or str type 

273 Exception: If the query execution fails with an error response 

274 """ 

275 if isinstance(query, ASTNode): 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true

276 result: HandlerResponse = self.query_integration_handler(query=query) 

277 elif isinstance(query, str): 

278 result: HandlerResponse = self.native_query_integration(query=query) 

279 else: 

280 raise NotImplementedError("Thew query argument must be ASTNode or string type") 

281 

282 if result.type == RESPONSE_TYPE.ERROR: 

283 if isinstance(query, ASTNode): 283 ↛ 290line 283 didn't jump to line 290 because the condition on line 283 was always true

284 try: 

285 query_str = query.to_string() 

286 except Exception: 

287 # most likely it is CreateTable with exotic column types 

288 query_str = "can't be dump" 

289 else: 

290 query_str = query 

291 

292 exception = QueryError( 

293 db_name=self.integration_handler.name, 

294 db_type=self.integration_handler.__class__.name, 

295 db_error_msg=result.error_message, 

296 failed_query=query_str, 

297 is_expected=result.is_expected_error, 

298 ) 

299 

300 if result.exception is None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 raise exception 

302 else: 

303 raise exception from result.exception 

304 

305 if result.type == RESPONSE_TYPE.OK: 

306 return DataHubResponse(affected_rows=result.affected_rows) 

307 

308 df = result.data_frame 

309 # region clearing df from NaN values 

310 # recursion error appears in pandas 1.5.3 https://github.com/pandas-dev/pandas/pull/45749 

311 if isinstance(df, pd.Series): 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 df = df.to_frame() 

313 

314 columns_info = [{"name": k, "type": v} for k, v in df.dtypes.items()] 

315 try: 

316 # replace python's Nan, np.nan and pd.NA to None 

317 # TODO keep all NAN to the end of processing, bacause replacing also changes dtypes 

318 df.replace([np.nan, pd.NA, pd.NaT], None, inplace=True) 

319 except Exception: 

320 logger.exception("Issue with clearing DF from NaN values:") 

321 # endregion 

322 

323 return DataHubResponse( 

324 data_frame=df, columns=columns_info, affected_rows=result.affected_rows, mysql_types=result.mysql_types 

325 ) 

326 

327 @collect_metrics 

328 def query_integration_handler(self, query: ASTNode) -> HandlerResponse: 

329 return self.integration_handler.query(query) 

330 

331 @collect_metrics 

332 def native_query_integration(self, query: str) -> HandlerResponse: 

333 return self.integration_handler.native_query(query)