Coverage for mindsdb / integrations / handlers / snowflake_handler / snowflake_handler.py: 80%
324 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import psutil
2import pandas
3from pandas import DataFrame
4from pandas.api import types as pd_types
5from snowflake.sqlalchemy import snowdialect
6from snowflake import connector
7from snowflake.connector.errors import NotSupportedError
8from snowflake.connector.cursor import SnowflakeCursor, ResultMetadata
9from typing import Any, Optional, List
11from mindsdb_sql_parser.ast.base import ASTNode
12from mindsdb_sql_parser.ast import Select, Identifier
14from mindsdb.utilities import log
15from mindsdb.integrations.libs.base import MetaDatabaseHandler
16from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
17from mindsdb.integrations.libs.response import (
18 HandlerStatusResponse as StatusResponse,
19 HandlerResponse as Response,
20 RESPONSE_TYPE,
21)
22from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE
24from .auth_types import (
25 PasswordAuthType,
26 KeyPairAuthType,
27)
29try:
30 import pyarrow as pa
32 memory_pool = pa.default_memory_pool()
33except Exception:
34 memory_pool = None
37logger = log.getLogger(__name__)
40def _map_type(internal_type_name: str) -> MYSQL_DATA_TYPE:
41 """Map Snowflake types to MySQL types.
43 Args:
44 internal_type_name (str): The name of the Snowflake type to map.
46 Returns:
47 MYSQL_DATA_TYPE: The MySQL type that corresponds to the Snowflake type.
48 """
49 internal_type_name = internal_type_name.upper()
50 types_map = {
51 ("NUMBER", "DECIMAL", "DEC", "NUMERIC"): MYSQL_DATA_TYPE.DECIMAL,
52 ("INT , INTEGER , BIGINT , SMALLINT , TINYINT , BYTEINT"): MYSQL_DATA_TYPE.INT,
53 ("FLOAT", "FLOAT4", "FLOAT8"): MYSQL_DATA_TYPE.FLOAT,
54 ("DOUBLE", "DOUBLE PRECISION", "REAL"): MYSQL_DATA_TYPE.DOUBLE,
55 ("VARCHAR"): MYSQL_DATA_TYPE.VARCHAR,
56 ("CHAR", "CHARACTER", "NCHAR"): MYSQL_DATA_TYPE.CHAR,
57 ("STRING", "TEXT", "NVARCHAR"): MYSQL_DATA_TYPE.TEXT,
58 ("NVARCHAR2", "CHAR VARYING", "NCHAR VARYING"): MYSQL_DATA_TYPE.VARCHAR,
59 ("BINARY", "VARBINARY"): MYSQL_DATA_TYPE.BINARY,
60 ("BOOLEAN",): MYSQL_DATA_TYPE.BOOL,
61 ("TIMESTAMP_NTZ", "DATETIME"): MYSQL_DATA_TYPE.DATETIME,
62 ("DATE",): MYSQL_DATA_TYPE.DATE,
63 ("TIME",): MYSQL_DATA_TYPE.TIME,
64 ("TIMESTAMP_LTZ"): MYSQL_DATA_TYPE.DATETIME,
65 ("TIMESTAMP_TZ"): MYSQL_DATA_TYPE.DATETIME,
66 ("VARIANT", "OBJECT", "ARRAY", "MAP", "GEOGRAPHY", "GEOMETRY", "VECTOR"): MYSQL_DATA_TYPE.VARCHAR,
67 }
69 for db_types_list, mysql_data_type in types_map.items():
70 if internal_type_name in db_types_list:
71 return mysql_data_type
73 logger.debug(f"Snowflake handler type mapping: unknown type: {internal_type_name}, use VARCHAR as fallback.")
74 return MYSQL_DATA_TYPE.VARCHAR
77def _make_table_response(result: DataFrame, cursor: SnowflakeCursor) -> Response:
78 """Build response from result and cursor.
79 NOTE: Snowflake return only 'general' type in description, so look on result's
80 DF types and use types from description only if DF type is 'object'
82 Args:
83 result (DataFrame): result of the query.
84 cursor (SnowflakeCursor): cursor object.
86 Returns:
87 Response: response object.
88 """
89 description: list[ResultMetadata] = cursor.description
90 mysql_types: list[MYSQL_DATA_TYPE] = []
91 for column in description:
92 column_dtype = result[column.name].dtype
93 description_column_type = connector.constants.FIELD_ID_TO_NAME.get(column.type_code)
94 if description_column_type in ("OBJECT", "ARRAY"):
95 mysql_types.append(MYSQL_DATA_TYPE.JSON)
96 continue
97 if description_column_type == "VECTOR":
98 mysql_types.append(MYSQL_DATA_TYPE.VECTOR)
99 continue
100 if pd_types.is_integer_dtype(column_dtype):
101 column_dtype_name = column_dtype.name
102 if column_dtype_name in ("int8", "Int8"):
103 mysql_types.append(MYSQL_DATA_TYPE.TINYINT)
104 elif column_dtype in ("int16", "Int16"):
105 mysql_types.append(MYSQL_DATA_TYPE.SMALLINT)
106 elif column_dtype in ("int32", "Int32"):
107 mysql_types.append(MYSQL_DATA_TYPE.MEDIUMINT)
108 elif column_dtype in ("int64", "Int64"): 108 ↛ 111line 108 didn't jump to line 111 because the condition on line 108 was always true
109 mysql_types.append(MYSQL_DATA_TYPE.BIGINT)
110 else:
111 mysql_types.append(MYSQL_DATA_TYPE.INT)
112 continue
113 if pd_types.is_float_dtype(column_dtype):
114 column_dtype_name = column_dtype.name
115 if column_dtype_name in ("float16", "Float16"): # Float16 does not exists so far 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 mysql_types.append(MYSQL_DATA_TYPE.FLOAT)
117 elif column_dtype_name in ("float32", "Float32"): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 mysql_types.append(MYSQL_DATA_TYPE.FLOAT)
119 elif column_dtype_name in ("float64", "Float64"): 119 ↛ 122line 119 didn't jump to line 122 because the condition on line 119 was always true
120 mysql_types.append(MYSQL_DATA_TYPE.DOUBLE)
121 else:
122 mysql_types.append(MYSQL_DATA_TYPE.FLOAT)
123 continue
124 if pd_types.is_bool_dtype(column_dtype):
125 mysql_types.append(MYSQL_DATA_TYPE.BOOLEAN)
126 continue
127 if pd_types.is_datetime64_any_dtype(column_dtype):
128 mysql_types.append(MYSQL_DATA_TYPE.DATETIME)
129 series = result[column.name]
130 # snowflake use pytz.timezone
131 if series.dt.tz is not None and getattr(series.dt.tz, "zone", "UTC") != "UTC":
132 series = series.dt.tz_convert("UTC")
133 result[column.name] = series.dt.tz_localize(None)
134 continue
136 if pd_types.is_object_dtype(column_dtype): 136 ↛ 152line 136 didn't jump to line 152 because the condition on line 136 was always true
137 if description_column_type == "TEXT":
138 # we can also check column.internal_size, if == 16777216 then it is TEXT, else VARCHAR(internal_size)
139 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
140 continue
141 elif description_column_type == "BINARY":
142 # if column.internal_size == 8388608 then BINARY, else VARBINARY(internal_size)
143 mysql_types.append(MYSQL_DATA_TYPE.BINARY)
144 continue
145 elif description_column_type == "DATE":
146 mysql_types.append(MYSQL_DATA_TYPE.DATE)
147 continue
148 elif description_column_type == "TIME":
149 mysql_types.append(MYSQL_DATA_TYPE.TIME)
150 continue
152 if description_column_type == "FIXED": 152 ↛ 159line 152 didn't jump to line 159 because the condition on line 152 was always true
153 if column.scale == 0: 153 ↛ 157line 153 didn't jump to line 157 because the condition on line 153 was always true
154 mysql_types.append(MYSQL_DATA_TYPE.INT)
155 else:
156 # It is NUMBER, DECIMAL or NUMERIC with scale > 0
157 mysql_types.append(MYSQL_DATA_TYPE.FLOAT)
158 continue
159 elif description_column_type == "REAL":
160 mysql_types.append(MYSQL_DATA_TYPE.FLOAT)
161 continue
163 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
165 df = DataFrame(
166 result,
167 columns=[column.name for column in description],
168 )
170 return Response(RESPONSE_TYPE.TABLE, data_frame=df, affected_rows=None, mysql_types=mysql_types)
173class SnowflakeHandler(MetaDatabaseHandler):
174 """
175 This handler handles connection and execution of the Snowflake statements.
176 """
178 name = "snowflake"
180 _auth_types = {
181 "key_pair": KeyPairAuthType(),
182 "password": PasswordAuthType(),
183 }
185 def __init__(self, name, **kwargs):
186 super().__init__(name)
187 self.connection_data = kwargs.get("connection_data")
188 self.renderer = SqlalchemyRender(snowdialect.dialect)
190 self.is_connected = False
191 self.connection = None
193 def connect(self):
194 """
195 Establishes a connection to a Snowflake account.
197 Supports two authentication methods:
198 1. User/password authentication (legacy)
199 2. Key pair authentication (recommended)
201 Raises:
202 ValueError: If the required connection parameters are not provided.
203 snowflake.connector.errors.Error: If an error occurs while connecting to the Snowflake account.
205 Returns:
206 snowflake.connector.connection.SnowflakeConnection: A connection object to the Snowflake account.
207 """
209 if self.is_connected is True: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 return self.connection
212 auth_type_key = self.connection_data.get("auth_type")
213 if auth_type_key is None:
214 supported = ", ".join(self._auth_types.keys())
215 raise ValueError(f"auth_type is required. Supported values: {supported}.")
217 auth_type = self._auth_types.get(auth_type_key)
218 if not auth_type:
219 supported = ", ".join(self._auth_types.keys())
220 raise ValueError(f"Invalid auth_type '{auth_type_key}'. Supported values: {supported}.")
222 config = auth_type.get_config(**self.connection_data)
224 try:
225 self.connection = connector.connect(**config)
226 self.connection.telemetry_enabled = False
227 self.is_connected = True
228 return self.connection
229 except connector.errors.Error as e:
230 logger.error(f"Error connecting to Snowflake, {e}!")
231 raise
233 def disconnect(self):
234 """
235 Closes the connection to the Snowflake account if it's currently open.
236 """
238 if self.is_connected is False:
239 return
240 self.connection.close()
241 self.is_connected = False
243 def check_connection(self) -> StatusResponse:
244 """
245 Checks the status of the connection to the Snowflake account.
247 Returns:
248 StatusResponse: An object containing the success status and an error message if an error occurs.
249 """
250 response = StatusResponse(False)
251 need_to_close = not self.is_connected
253 try:
254 connection = self.connect()
256 # Execute a simple query to test the connection
257 with connection.cursor() as cur:
258 cur.execute("select 1;")
259 response.success = True
260 except (connector.errors.Error, ValueError) as e:
261 logger.error(f"Error connecting to Snowflake, {e}!")
262 response.error_message = str(e)
264 if response.success and need_to_close:
265 self.disconnect()
267 elif not response.success and self.is_connected:
268 self.is_connected = False
270 return response
272 def native_query(self, query: str) -> Response:
273 """
274 Executes a SQL query on the Snowflake account and returns the result.
276 Args:
277 query (str): The SQL query to be executed.
279 Returns:
280 Response: A response object containing the result of the query or an error message.
281 """
283 need_to_close = self.is_connected is False
285 connection = self.connect()
286 with connection.cursor(connector.DictCursor) as cur:
287 try:
288 cur.execute(query)
289 try:
290 try:
291 batches_iter = cur.fetch_pandas_batches()
292 except ValueError:
293 # duplicated columns raises ValueError
294 raise NotSupportedError()
296 batches = []
297 memory_estimation_check_done = False
299 for batch_df in batches_iter:
300 batches.append(batch_df)
301 # region check the size of first batch (if it is big enough) to get an estimate of the full
302 # dataset size. If i does not fit in memory - raise an error.
303 # NOTE batch size cannot be set on client side. Also, Snowflake will download
304 # 'CLIENT_PREFETCH_THREADS' count of chunks in parallel (by default 4), therefore this check
305 # can not work in some cases.
306 batches_rowcount = sum([len(x) for x in batches])
307 if memory_estimation_check_done is False and batches_rowcount > 1000:
308 memory_estimation_check_done = True
309 available_memory_kb = psutil.virtual_memory().available >> 10
310 batches_size_kb = sum(
311 [(x.memory_usage(index=True, deep=True).sum() >> 10) for x in batches]
312 )
313 total_rowcount = cur.rowcount
314 rest_rowcount = total_rowcount - batches_rowcount
315 rest_estimated_size_kb = int((rest_rowcount / batches_rowcount) * batches_size_kb)
316 if (available_memory_kb * 0.9) < rest_estimated_size_kb: 316 ↛ 299line 316 didn't jump to line 299 because the condition on line 316 was always true
317 logger.error(
318 "Attempt to get too large dataset:\n"
319 f"batches_rowcount={batches_rowcount}, size_kb={batches_size_kb}\n"
320 f"total_rowcount={total_rowcount}, estimated_size_kb={rest_estimated_size_kb}\n"
321 f"available_memory_kb={available_memory_kb}"
322 )
323 raise MemoryError("Not enought memory")
324 # endregion
325 if len(batches) > 0: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true
326 response = _make_table_response(result=pandas.concat(batches, ignore_index=True), cursor=cur)
327 else:
328 response = Response(RESPONSE_TYPE.TABLE, DataFrame([], columns=[x[0] for x in cur.description]))
329 except NotSupportedError:
330 # Fallback for CREATE/DELETE/UPDATE. These commands returns table with single column,
331 # but it cannot be retrieved as pandas DataFrame.
332 result = cur.fetchall()
333 match result:
334 case (
335 [{"number of rows inserted": affected_rows}]
336 | [{"number of rows deleted": affected_rows}]
337 | [{"number of rows updated": affected_rows, "number of multi-joined rows updated": _}]
338 ):
339 response = Response(RESPONSE_TYPE.OK, affected_rows=affected_rows)
340 case list():
341 response = Response(
342 RESPONSE_TYPE.TABLE, DataFrame(result, columns=[x[0] for x in cur.description])
343 )
344 case _:
345 # Looks like SnowFlake always returns something in response, so this is suspicious
346 logger.warning("Snowflake did not return any data in response.")
347 response = Response(RESPONSE_TYPE.OK)
348 except Exception as e:
349 logger.error(f"Error running query: {query} on {self.connection_data.get('database')}, {e}!")
350 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e))
352 if need_to_close is True: 352 ↛ 355line 352 didn't jump to line 355 because the condition on line 352 was always true
353 self.disconnect()
355 if memory_pool is not None and memory_pool.backend_name == "jemalloc":
356 # This reduce memory consumption, but will slow down next query slightly.
357 # Except pool type 'jemalloc': memory consumption do not change significantly
358 # and next query processing time may be even lower.
359 memory_pool.release_unused()
361 return response
363 def query(self, query: ASTNode) -> Response:
364 """
365 Executes a SQL query represented by an ASTNode and retrieves the data.
367 Args:
368 query (ASTNode): An ASTNode representing the SQL query to be executed.
370 Returns:
371 Response: The response from the `native_query` method, containing the result of the SQL query execution.
372 """
374 query_str = self.renderer.get_string(query, with_failback=True)
375 logger.debug(f"Executing SQL query: {query_str}")
376 result = self.native_query(query_str)
377 return self.lowercase_columns(result, query)
379 def lowercase_columns(self, result, query):
380 if not isinstance(query, Select) or result.data_frame is None: 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true
381 return result
383 quoted_columns = []
384 if query.targets is not None:
385 for column in query.targets:
386 if hasattr(column, "alias") and column.alias is not None:
387 if column.alias.is_quoted[-1]:
388 quoted_columns.append(column.alias.parts[-1])
389 elif isinstance(column, Identifier):
390 if column.is_quoted[-1]:
391 quoted_columns.append(column.parts[-1])
393 rename_columns = {}
394 for col in result.data_frame.columns:
395 if col.isupper() and col not in quoted_columns:
396 rename_columns[col] = col.lower()
397 if rename_columns:
398 result.data_frame = result.data_frame.rename(columns=rename_columns)
399 return result
401 def get_tables(self) -> Response:
402 """
403 Retrieves a list of all non-system tables and views in the current schema of the Snowflake account.
405 Returns:
406 Response: A response object containing the list of tables and views, formatted as per the `Response` class.
407 """
409 query = """
410 SELECT TABLE_NAME, TABLE_SCHEMA, TABLE_TYPE
411 FROM INFORMATION_SCHEMA.TABLES
412 WHERE TABLE_TYPE IN ('BASE TABLE', 'VIEW')
413 AND TABLE_SCHEMA = current_schema()
414 """
415 return self.native_query(query)
417 def get_columns(self, table_name) -> Response:
418 """
419 Retrieves column details for a specified table in the Snowflake account.
421 Args:
422 table_name (str): The name of the table for which to retrieve column information.
424 Returns:
425 Response: A response object containing the column details, formatted as per the `Response` class.
427 Raises:
428 ValueError: If the 'table_name' is not a valid string.
429 """
431 if not table_name or not isinstance(table_name, str): 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true
432 raise ValueError("Invalid table name provided.")
434 query = f"""
435 SELECT
436 COLUMN_NAME,
437 DATA_TYPE,
438 ORDINAL_POSITION,
439 COLUMN_DEFAULT,
440 IS_NULLABLE,
441 CHARACTER_MAXIMUM_LENGTH,
442 CHARACTER_OCTET_LENGTH,
443 NUMERIC_PRECISION,
444 NUMERIC_SCALE,
445 DATETIME_PRECISION,
446 CHARACTER_SET_NAME,
447 COLLATION_NAME
448 FROM INFORMATION_SCHEMA.COLUMNS
449 WHERE TABLE_NAME = '{table_name}'
450 AND TABLE_SCHEMA = current_schema()
451 """
452 result = self.native_query(query)
453 result.to_columns_table_response(map_type_fn=_map_type)
455 return result
457 def meta_get_tables(self, table_names: Optional[List[str]] = None) -> Response:
458 """
459 Retrieves metadata information about the tables in the Snowflake database to be stored in the data catalog.
461 Args:
462 table_names (list): A list of table names for which to retrieve metadata information.
464 Returns:
465 Response: A response object containing the metadata information, formatted as per the `Response` class.
466 """
467 query = """
468 SELECT
469 TABLE_CATALOG,
470 TABLE_SCHEMA,
471 TABLE_NAME,
472 TABLE_TYPE,
473 COMMENT AS TABLE_DESCRIPTION,
474 ROW_COUNT,
475 CREATED,
476 LAST_ALTERED
477 FROM INFORMATION_SCHEMA.TABLES
478 WHERE TABLE_SCHEMA = current_schema()
479 AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
480 """
482 if table_names is not None and len(table_names) > 0: 482 ↛ 486line 482 didn't jump to line 486 because the condition on line 482 was always true
483 table_names_str = ", ".join([f"'{t.upper()}'" for t in table_names])
484 query += f" AND TABLE_NAME IN ({table_names_str})"
486 result = self.native_query(query)
487 result.data_frame["ROW_COUNT"] = result.data_frame["ROW_COUNT"].astype(int)
489 return result
491 def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response:
492 """
493 Retrieves column metadata for the specified tables (or all tables if no list is provided).
495 Args:
496 table_names (list): A list of table names for which to retrieve column metadata.
498 Returns:
499 Response: A response object containing the column metadata.
500 """
501 query = """
502 SELECT
503 TABLE_NAME,
504 COLUMN_NAME,
505 DATA_TYPE,
506 COMMENT AS COLUMN_DESCRIPTION,
507 COLUMN_DEFAULT,
508 (IS_NULLABLE = 'YES') AS IS_NULLABLE,
509 CHARACTER_MAXIMUM_LENGTH,
510 CHARACTER_OCTET_LENGTH,
511 NUMERIC_PRECISION,
512 NUMERIC_SCALE,
513 DATETIME_PRECISION,
514 CHARACTER_SET_NAME,
515 COLLATION_NAME
516 FROM INFORMATION_SCHEMA.COLUMNS
517 WHERE TABLE_SCHEMA = current_schema()
518 """
520 if table_names is not None and len(table_names) > 0: 520 ↛ 524line 520 didn't jump to line 524 because the condition on line 520 was always true
521 table_names_str = ", ".join([f"'{t.upper()}'" for t in table_names])
522 query += f" AND TABLE_NAME IN ({table_names_str})"
524 result = self.native_query(query)
525 return result
527 def meta_get_column_statistics(self, table_names: Optional[List[str]] = None) -> Response:
528 """
529 Retrieves basic column statistics: null %, distinct count.
530 Due to Snowflake limitations, this runs per-table not per-column.
531 TODO: Add most_common_values and most_common_frequencies
532 """
533 columns_query = """
534 SELECT TABLE_NAME, COLUMN_NAME
535 FROM INFORMATION_SCHEMA.COLUMNS
536 WHERE TABLE_SCHEMA = current_schema()
537 """
538 if table_names: 538 ↛ 542line 538 didn't jump to line 542 because the condition on line 538 was always true
539 table_names_str = ", ".join([f"'{t.upper()}'" for t in table_names])
540 columns_query += f" AND TABLE_NAME IN ({table_names_str})"
542 columns_result = self.native_query(columns_query)
543 if (
544 columns_result.type == RESPONSE_TYPE.ERROR
545 or columns_result.data_frame is None
546 or columns_result.data_frame.empty
547 ):
548 return Response(RESPONSE_TYPE.ERROR, error_message="No columns found.")
550 columns_df = columns_result.data_frame
551 grouped = columns_df.groupby("TABLE_NAME")
552 all_stats = []
554 for table_name, group in grouped:
555 select_parts = []
556 for _, row in group.iterrows():
557 col = row["COLUMN_NAME"]
558 # Ensure column names in the query are properly quoted if they contain special characters or are case-sensitive
559 quoted_col = f'"{col}"'
560 select_parts.extend(
561 [
562 f'COUNT_IF({quoted_col} IS NULL) AS "nulls_{col}"',
563 f'APPROX_COUNT_DISTINCT({quoted_col}) AS "distincts_{col}"',
564 f'MIN({quoted_col}) AS "min_{col}"',
565 f'MAX({quoted_col}) AS "max_{col}"',
566 ]
567 )
569 quoted_table_name = f'"{table_name}"'
570 stats_query = f"""
571 SELECT COUNT(*) AS "total_rows", {", ".join(select_parts)}
572 FROM {quoted_table_name}
573 """
574 try:
575 stats_res = self.native_query(stats_query)
576 if stats_res.type != RESPONSE_TYPE.TABLE or stats_res.data_frame is None or stats_res.data_frame.empty: 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true
577 logger.warning(
578 f"Could not retrieve stats for table {table_name}. Query returned no data or an error: {stats_res.error_message if stats_res.type == RESPONSE_TYPE.ERROR else 'No data'}"
579 )
580 # Add placeholder stats if query fails or returns empty
581 for _, row in group.iterrows():
582 all_stats.append(
583 {
584 "table_name": table_name,
585 "column_name": row["COLUMN_NAME"],
586 "null_percentage": None,
587 "distinct_values_count": None,
588 "most_common_values": [],
589 "most_common_frequencies": [],
590 "minimum_value": None,
591 "maximum_value": None,
592 }
593 )
594 continue
596 stats_data = stats_res.data_frame.iloc[0]
597 total_rows = stats_data.get("total_rows", 0)
599 for _, row in group.iterrows():
600 col = row["COLUMN_NAME"]
601 # Keys for stats_data should match the aliases in stats_query (e.g., "nulls_COLNAME")
602 nulls = stats_data.get(f"nulls_{col}", 0)
603 distincts = stats_data.get(f"distincts_{col}", None)
604 min_val = stats_data.get(f"min_{col}", None)
605 max_val = stats_data.get(f"max_{col}", None)
606 null_pct = (nulls / total_rows) * 100 if total_rows is not None and total_rows > 0 else None
608 all_stats.append(
609 {
610 "table_name": table_name,
611 "column_name": col,
612 "null_percentage": null_pct,
613 "distinct_values_count": distincts,
614 "most_common_values": [],
615 "most_common_frequencies": [],
616 "minimum_value": min_val,
617 "maximum_value": max_val,
618 }
619 )
620 except Exception as e:
621 logger.error(f"Exception while fetching statistics for table {table_name}: {e}")
622 for _, row in group.iterrows():
623 all_stats.append(
624 {
625 "table_name": table_name,
626 "column_name": row["COLUMN_NAME"],
627 "null_percentage": None,
628 "distinct_values_count": None,
629 "most_common_values": [],
630 "most_common_frequencies": [],
631 "minimum_value": None,
632 "maximum_value": None,
633 }
634 )
636 if not all_stats: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true
637 return Response(RESPONSE_TYPE.TABLE, data_frame=pandas.DataFrame())
639 return Response(RESPONSE_TYPE.TABLE, data_frame=pandas.DataFrame(all_stats))
641 def meta_get_primary_keys(self, table_names: Optional[List[str]] = None) -> Response:
642 """
643 Retrieves primary key information for the specified tables (or all tables if no list is provided).
645 Args:
646 table_names (list): A list of table names for which to retrieve primary key information.
648 Returns:
649 Response: A response object containing the primary key information.
650 """
651 try:
652 query = """
653 SHOW PRIMARY KEYS IN TABLE;
654 """
656 response = self.native_query(query)
657 if response.type == RESPONSE_TYPE.ERROR and response.error_message: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true
658 logger.error(f"Query error in meta_get_primary_keys: {response.error_message}\nQuery:\n{query}")
660 df = response.data_frame
661 if not df.empty: 661 ↛ 668line 661 didn't jump to line 668 because the condition on line 661 was always true
662 if table_names: 662 ↛ 665line 662 didn't jump to line 665 because the condition on line 662 was always true
663 df = df[df["table_name"].isin(table_names)]
665 df = df[["table_name", "column_name", "key_sequence", "constraint_name"]]
666 df = df.rename(columns={"key_sequence": "ordinal_position"})
668 response.data_frame = df
670 return response
672 except Exception as e:
673 logger.error(f"Exception in meta_get_primary_keys: {e!r}")
674 return Response(RESPONSE_TYPE.ERROR, error_message=f"Exception querying primary keys: {e!r}")
676 def meta_get_foreign_keys(self, table_names: Optional[List[str]] = None) -> Response:
677 """
678 Retrieves foreign key information for the specified tables (or all tables if no list is provided).
680 Args:
681 table_names (list): A list of table names for which to retrieve foreign key information.
683 Returns:
684 Response: A response object containing the foreign key information.
685 """
686 try:
687 query = """
688 SHOW IMPORTED KEYS IN TABLE;
689 """
691 response = self.native_query(query)
692 if response.type == RESPONSE_TYPE.ERROR and response.error_message: 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true
693 logger.error(f"Query error in meta_get_primary_keys: {response.error_message}\nQuery:\n{query}")
695 df = response.data_frame
696 if not df.empty: 696 ↛ 710line 696 didn't jump to line 710 because the condition on line 696 was always true
697 if table_names: 697 ↛ 700line 697 didn't jump to line 700 because the condition on line 697 was always true
698 df = df[df["pk_table_name"].isin(table_names) & df["fk_table_name"].isin(table_names)]
700 df = df[["pk_table_name", "pk_column_name", "fk_table_name", "fk_column_name"]]
701 df = df.rename(
702 columns={
703 "pk_table_name": "child_table_name",
704 "pk_column_name": "child_column_name",
705 "fk_table_name": "parent_table_name",
706 "fk_column_name": "parent_column_name",
707 }
708 )
710 response.data_frame = df
712 return response
714 except Exception as e:
715 logger.error(f"Exception in meta_get_primary_keys: {e!r}")
716 return Response(RESPONSE_TYPE.ERROR, error_message=f"Exception querying primary keys: {e!r}")
718 def meta_get_handler_info(self, **kwargs: Any) -> str:
719 """
720 Retrieves information about the design and implementation of the database handler.
721 This should include, but not be limited to, the following:
722 - The type of SQL queries and operations that the handler supports.
723 - etc.
725 Args:
726 kwargs: Additional keyword arguments that may be used in generating the handler information.
728 Returns:
729 str: A string containing information about the database handler's design and implementation.
730 """
731 return (
732 "To query columns that contain special characters, use ticks around the column name, e.g. `column name`.\n"
733 "DO NOT use double quotes for this purpose."
734 )