Coverage for mindsdb / integrations / handlers / mssql_handler / mssql_handler.py: 83%
284 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Any, Union, TYPE_CHECKING
2import datetime
4import pymssql
5from pymssql import OperationalError
6import pandas as pd
7from pandas.api import types as pd_types
9from mindsdb_sql_parser import parse_sql
10from mindsdb_sql_parser.ast.base import ASTNode
11from mindsdb_sql_parser.ast import Identifier
13from mindsdb.integrations.libs.base import MetaDatabaseHandler
14from mindsdb.integrations.utilities.query_traversal import query_traversal
15from mindsdb.utilities import log
16from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
17from mindsdb.integrations.libs.response import (
18 HandlerStatusResponse as StatusResponse,
19 HandlerResponse as Response,
20 RESPONSE_TYPE,
21)
22from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE
24if TYPE_CHECKING:
25 import pyodbc
27logger = log.getLogger(__name__)
30def _map_type(mssql_type_text: str) -> MYSQL_DATA_TYPE:
31 """Map MSSQL text types names to MySQL types as enum.
33 Args:
34 mssql_type_text (str): The name of the MSSQL type to map.
36 Returns:
37 MYSQL_DATA_TYPE: The MySQL type enum that corresponds to the MSSQL text type name.
38 """
39 internal_type_name = mssql_type_text.lower()
40 types_map = {
41 ("tinyint", "smallint", "int", "bigint"): MYSQL_DATA_TYPE.INT,
42 ("bit",): MYSQL_DATA_TYPE.BOOL,
43 ("money", "smallmoney", "float", "real"): MYSQL_DATA_TYPE.FLOAT,
44 ("decimal", "numeric"): MYSQL_DATA_TYPE.DECIMAL,
45 ("date",): MYSQL_DATA_TYPE.DATE,
46 ("time",): MYSQL_DATA_TYPE.TIME,
47 ("datetime2", "datetimeoffset", "datetime", "smalldatetime"): MYSQL_DATA_TYPE.DATETIME,
48 ("varchar", "nvarchar"): MYSQL_DATA_TYPE.VARCHAR,
49 ("char", "text", "nchar", "ntext"): MYSQL_DATA_TYPE.TEXT,
50 ("binary", "varbinary", "image"): MYSQL_DATA_TYPE.BINARY,
51 }
53 for db_types_list, mysql_data_type in types_map.items():
54 if internal_type_name in db_types_list:
55 return mysql_data_type
57 logger.debug(f"MSSQL handler type mapping: unknown type: {internal_type_name}, use VARCHAR as fallback.")
58 return MYSQL_DATA_TYPE.VARCHAR
61def _make_table_response(
62 result: list[Union[dict[str, Any], tuple]], cursor: Union[pymssql.Cursor, "pyodbc.Cursor"], use_odbc: bool = False
63) -> Response:
64 """Build response from result and cursor.
66 Args:
67 result (list[Union[dict[str, Any], tuple]]): result of the query.
68 cursor (Union[pymssql.Cursor, pyodbc.Cursor]): cursor object.
69 use_odbc (bool): whether ODBC connection is being used.
71 Returns:
72 Response: response object.
73 """
74 description: list[tuple[Any]] = cursor.description
75 mysql_types: list[MYSQL_DATA_TYPE] = []
76 columns = [x[0] for x in cursor.description]
78 if not result: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 data_frame = pd.DataFrame(columns=columns)
80 elif use_odbc:
81 # from_records() understands tuple-like records (including pyodbc.Row)
82 data_frame = pd.DataFrame.from_records(result, columns=columns)
83 else:
84 # pymssql with as_dict=True returns list of dicts
85 data_frame = pd.DataFrame(result)
87 for column in description:
88 column_name = column[0]
89 column_type = column[1]
90 column_dtype = data_frame[column_name].dtype
92 if use_odbc:
93 # For pyodbc, use type inference based on pandas dtype
94 if pd_types.is_integer_dtype(column_dtype):
95 mysql_types.append(MYSQL_DATA_TYPE.INT)
96 elif pd_types.is_float_dtype(column_dtype):
97 mysql_types.append(MYSQL_DATA_TYPE.FLOAT)
98 elif pd_types.is_bool_dtype(column_dtype): 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 mysql_types.append(MYSQL_DATA_TYPE.TINYINT)
100 elif pd_types.is_datetime64_any_dtype(column_dtype):
101 mysql_types.append(MYSQL_DATA_TYPE.DATETIME)
102 elif pd_types.is_object_dtype(column_dtype): 102 ↛ 110line 102 didn't jump to line 110 because the condition on line 102 was always true
103 if len(data_frame) > 0 and isinstance( 103 ↛ 106line 103 didn't jump to line 106 because the condition on line 103 was never true
104 data_frame[column_name].iloc[0], (datetime.datetime, datetime.date, datetime.time)
105 ):
106 mysql_types.append(MYSQL_DATA_TYPE.DATETIME)
107 else:
108 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
109 else:
110 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
111 else:
112 match column_type:
113 case pymssql.NUMBER:
114 if pd_types.is_integer_dtype(column_dtype):
115 mysql_types.append(MYSQL_DATA_TYPE.INT)
116 elif pd_types.is_float_dtype(column_dtype):
117 mysql_types.append(MYSQL_DATA_TYPE.FLOAT)
118 elif pd_types.is_bool_dtype(column_dtype): 118 ↛ 121line 118 didn't jump to line 121 because the condition on line 118 was always true
119 mysql_types.append(MYSQL_DATA_TYPE.TINYINT)
120 else:
121 mysql_types.append(MYSQL_DATA_TYPE.DOUBLE)
122 case pymssql.DECIMAL:
123 mysql_types.append(MYSQL_DATA_TYPE.DECIMAL)
124 case pymssql.STRING:
125 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
126 case pymssql.DATETIME:
127 mysql_types.append(MYSQL_DATA_TYPE.DATETIME)
128 case pymssql.BINARY:
129 # DATE and TIME types returned as 'BINARY' type, and dataframe type is 'object', so it is not possible
130 # to infer correct mysql type for them
131 if pd_types.is_datetime64_any_dtype(column_dtype):
132 # pymssql return datetimes as 'binary' type
133 # if timezone is present, then it is datetime.timezone
134 series = data_frame[column_name]
135 if (
136 series.dt.tz is not None
137 and isinstance(series.dt.tz, datetime.timezone)
138 and series.dt.tz != datetime.timezone.utc
139 ):
140 series = series.dt.tz_convert("UTC")
141 data_frame[column_name] = series.dt.tz_localize(None)
142 mysql_types.append(MYSQL_DATA_TYPE.DATETIME)
143 else:
144 mysql_types.append(MYSQL_DATA_TYPE.BINARY)
145 case _:
146 logger.warning(f"Unknown type: {column_type}, use TEXT as fallback.")
147 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
149 return Response(RESPONSE_TYPE.TABLE, data_frame=data_frame, mysql_types=mysql_types)
152class SqlServerHandler(MetaDatabaseHandler):
153 """
154 This handler handles connection and execution of the Microsoft SQL Server statements.
155 Supports both native pymssql connections and ODBC connections via pyodbc.
157 To use ODBC connection, specify either:
158 - 'use_odbc': True in connection parameters, or
159 - 'driver': '<ODBC driver name>' in connection parameters
160 """
162 name = "mssql"
164 def __init__(self, name, **kwargs):
165 super().__init__(name)
166 self.parser = parse_sql
167 self.connection_args = kwargs.get("connection_data")
168 self.dialect = "mssql"
169 self.database = self.connection_args.get("database")
170 self.schema = self.connection_args.get("schema")
171 self.renderer = SqlalchemyRender("mssql")
173 # Determine if ODBC should be used
174 self.use_odbc = self.connection_args.get("use_odbc", False) or "driver" in self.connection_args
176 self.connection = None
177 self.is_connected = False
179 def __del__(self):
180 if self.is_connected is True:
181 self.disconnect()
183 def connect(self):
184 """
185 Establishes a connection to a Microsoft SQL Server database.
186 Uses either pymssql (native) or pyodbc based on configuration.
188 Raises:
189 pymssql._mssql.OperationalError or pyodbc.Error: If an error occurs while connecting to the database.
191 Returns:
192 Union[pymssql.Connection, pyodbc.Connection]: A connection object to the Microsoft SQL Server database.
193 """
195 if self.is_connected is True: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 return self.connection
198 if self.use_odbc:
199 return self._connect_odbc()
200 else:
201 return self._connect_pymssql()
203 def _connect_pymssql(self):
204 """Connect using pymssql (native FreeTDS-based connection)."""
205 # Mandatory connection parameters
206 if not all(key in self.connection_args for key in ["host", "user", "password", "database"]):
207 raise ValueError("Required parameters (host, user, password, database) must be provided.")
209 config = {
210 "host": self.connection_args.get("host"),
211 "user": self.connection_args.get("user"),
212 "password": self.connection_args.get("password"),
213 "database": self.connection_args.get("database"),
214 }
216 # Optional connection parameters
217 if "port" in self.connection_args: 217 ↛ 220line 217 didn't jump to line 220 because the condition on line 217 was always true
218 config["port"] = self.connection_args.get("port")
220 if "server" in self.connection_args:
221 config["server"] = self.connection_args.get("server")
223 try:
224 self.connection = pymssql.connect(**config)
225 self.is_connected = True
226 return self.connection
227 except OperationalError as e:
228 logger.error(f"Error connecting to Microsoft SQL Server {self.database}, {e}!")
229 self.is_connected = False
230 raise
232 def _connect_odbc(self):
233 """Connect using pyodbc (ODBC connection)."""
234 try:
235 import pyodbc
236 except ImportError as e:
237 raise ImportError(
238 "pyodbc is not installed. Install it with 'pip install pyodbc' or "
239 "'pip install mindsdb[mssql-odbc]' to use ODBC connections."
240 ) from e
242 # Mandatory connection parameters
243 if not all(key in self.connection_args for key in ["host", "user", "password", "database"]): 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 raise ValueError("Required parameters (host, user, password, database) must be provided.")
246 driver = self.connection_args.get("driver", "ODBC Driver 18 for SQL Server")
247 host = self.connection_args.get("host")
248 port = self.connection_args.get("port", 1433)
249 database = self.connection_args.get("database")
250 user = self.connection_args.get("user")
251 password = self.connection_args.get("password")
253 conn_str_parts = [
254 f"DRIVER={{{driver}}}",
255 f"SERVER={host},{port}",
256 f"DATABASE={database}",
257 f"UID={user}",
258 f"PWD={password}",
259 ]
261 # Add optional parameters
262 if "encrypt" in self.connection_args:
263 conn_str_parts.append(f"Encrypt={self.connection_args.get('encrypt', 'yes')}")
264 if "trust_server_certificate" in self.connection_args:
265 conn_str_parts.append(
266 f"TrustServerCertificate={self.connection_args.get('trust_server_certificate', 'yes')}"
267 )
269 if "connection_string_args" in self.connection_args:
270 conn_str_parts.append(self.connection_args["connection_string_args"])
272 conn_str = ";".join(conn_str_parts)
274 try:
275 self.connection = pyodbc.connect(conn_str, timeout=10)
276 self.is_connected = True
277 return self.connection
278 except pyodbc.Error as e:
279 logger.error(f"Error connecting to Microsoft SQL Server {self.database} via ODBC, {e}!")
280 self.is_connected = False
282 # Check if it's a driver not found error
283 error_msg = str(e)
284 if "Driver" in error_msg and ("not found" in error_msg or "specified" in error_msg): 284 ↛ 290line 284 didn't jump to line 290 because the condition on line 284 was always true
285 raise ConnectionError(
286 f"ODBC Driver not found: {driver}. "
287 f"Please install the Microsoft ODBC Driver for SQL Server. "
288 f"Error: {e}"
289 ) from e
290 raise
291 except Exception as e:
292 logger.error(f"Error connecting to Microsoft SQL Server {self.database} via ODBC, {e}!")
293 self.is_connected = False
294 raise
296 def disconnect(self):
297 """
298 Closes the connection to the Microsoft SQL Server database if it's currently open.
299 """
301 if not self.is_connected:
302 return
303 if self.connection is not None:
304 try:
305 self.connection.close()
306 except Exception:
307 logger.exception("Failed to close connection:")
308 pass
309 self.connection = None
310 self.is_connected = False
312 def check_connection(self) -> StatusResponse:
313 """
314 Checks the status of the connection to the Microsoft SQL Server database.
316 Returns:
317 StatusResponse: An object containing the success status and an error message if an error occurs.
318 """
320 response = StatusResponse(False)
321 need_to_close = self.is_connected is False
323 try:
324 connection = self.connect()
325 with connection.cursor() as cur:
326 # Execute a simple query to test the connection
327 cur.execute("select 1;")
328 response.success = True
329 except OperationalError as e:
330 logger.error(f"Error connecting to Microsoft SQL Server {self.database}, {e}!")
331 response.error_message = str(e)
333 if response.success and need_to_close:
334 self.disconnect()
335 elif not response.success and self.is_connected: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 self.is_connected = False
338 return response
340 def native_query(self, query: str) -> Response:
341 """
342 Executes a SQL query on the Microsoft SQL Server database and returns the result.
344 Args:
345 query (str): The SQL query to be executed.
347 Returns:
348 Response: A response object containing the result of the query or an error message.
349 """
351 need_to_close = self.is_connected is False
353 connection = self.connect()
355 if self.use_odbc:
356 with connection.cursor() as cur:
357 try:
358 cur.execute(query)
359 if cur.description: 359 ↛ 363line 359 didn't jump to line 363 because the condition on line 359 was always true
360 result = cur.fetchall()
361 response = _make_table_response(result, cur, use_odbc=True)
362 else:
363 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount)
364 connection.commit()
365 except Exception as e:
366 logger.exception(f"Error running query: {query} on {self.database}, {e}!")
367 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e))
368 connection.rollback()
369 else:
370 with connection.cursor(as_dict=True) as cur:
371 try:
372 cur.execute(query)
373 if cur.description:
374 result = cur.fetchall()
375 response = _make_table_response(result, cur, use_odbc=False)
376 else:
377 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount)
378 connection.commit()
379 except Exception as e:
380 logger.exception(f"Error running query: {query} on {self.database}, {e}!")
381 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e))
382 connection.rollback()
384 if need_to_close is True:
385 self.disconnect()
387 return response
389 def _add_schema_to_tables(self, node, is_table=False, **kwargs):
390 """
391 Callback for query_traversal that adds schema prefix to table identifiers.
393 Args:
394 node: The AST node being visited
395 is_table: True if this node represents a table reference
396 **kwargs: Other arguments from query_traversal (parent_query, callstack, etc.)
398 Returns:
399 None to keep traversing, or a replacement node
400 Note: This is mostly a workaround for Minds but it should still work for FQE
401 """
402 if is_table and isinstance(node, Identifier):
403 # Only add schema if the identifier doesn't already have one (single part)
404 if len(node.parts) == 1:
405 node.parts.insert(0, self.schema)
406 node.is_quoted.insert(0, False)
407 return None
409 def query(self, query: ASTNode) -> Response:
410 """
411 Executes a SQL query represented by an ASTNode and retrieves the data.
413 Args:
414 query (ASTNode): An ASTNode representing the SQL query to be executed.
416 Returns:
417 Response: The response from the `native_query` method, containing the result of the SQL query execution.
418 """
419 # Add schema prefix to table identifiers if schema is configured
420 if self.schema: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 query_traversal(query, self._add_schema_to_tables)
423 query_str = self.renderer.get_string(query, with_failback=True)
424 logger.debug(f"Executing SQL query: {query_str}")
425 return self.native_query(query_str)
427 def get_tables(self) -> Response:
428 """
429 Retrieves a list of all non-system tables and views in the current schema of the Microsoft SQL Server database.
431 Returns:
432 Response: A response object containing the list of tables and views, formatted as per the `Response` class.
433 """
435 query = f"""
436 SELECT
437 table_schema,
438 table_name,
439 table_type
440 FROM {self.database}.INFORMATION_SCHEMA.TABLES
441 WHERE TABLE_TYPE in ('BASE TABLE', 'VIEW')
442 """
443 if self.schema: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 query += f" AND table_schema = '{self.schema}'"
446 return self.native_query(query)
448 def get_columns(self, table_name) -> Response:
449 """
450 Retrieves column details for a specified table in the Microsoft SQL Server database.
452 Args:
453 table_name (str): The name of the table for which to retrieve column information.
455 Returns:
456 Response: A response object containing the column details, formatted as per the `Response` class.
457 Raises:
458 ValueError: If the 'table_name' is not a valid string.
459 """
461 query = f"""
462 SELECT
463 COLUMN_NAME,
464 DATA_TYPE,
465 ORDINAL_POSITION,
466 COLUMN_DEFAULT,
467 IS_NULLABLE,
468 CHARACTER_MAXIMUM_LENGTH,
469 CHARACTER_OCTET_LENGTH,
470 NUMERIC_PRECISION,
471 NUMERIC_SCALE,
472 DATETIME_PRECISION,
473 CHARACTER_SET_NAME,
474 COLLATION_NAME
475 FROM
476 information_schema.columns
477 WHERE
478 table_name = '{table_name}'
479 """
481 if self.schema: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 query += f" AND table_schema = '{self.schema}'"
484 result = self.native_query(query)
485 result.to_columns_table_response(map_type_fn=_map_type)
486 return result
488 def meta_get_tables(self, table_names: list[str] | None = None) -> Response:
489 """
490 Retrieves metadata information about the tables in the Microsoft SQL Server database
491 to be stored in the data catalog.
493 Args:
494 table_names (list): A list of table names for which to retrieve metadata information.
496 Returns:
497 Response: A response object containing the metadata information, formatted as per the `Response` class.
498 """
499 query = f"""
500 SELECT
501 t.TABLE_NAME as table_name,
502 t.TABLE_SCHEMA as table_schema,
503 t.TABLE_TYPE as table_type,
504 CAST(ep.value AS NVARCHAR(MAX)) as table_description,
505 SUM(p.rows) as row_count
506 FROM {self.database}.INFORMATION_SCHEMA.TABLES t
507 LEFT JOIN {self.database}.sys.tables st
508 ON t.TABLE_NAME = st.name
509 LEFT JOIN {self.database}.sys.schemas s
510 ON st.schema_id = s.schema_id AND t.TABLE_SCHEMA = s.name
511 LEFT JOIN {self.database}.sys.extended_properties ep
512 ON st.object_id = ep.major_id
513 AND ep.minor_id = 0
514 AND ep.class = 1
515 AND ep.name = 'MS_Description'
516 LEFT JOIN {self.database}.sys.partitions p
517 ON st.object_id = p.object_id
518 AND p.index_id IN (0, 1)
519 WHERE t.TABLE_TYPE IN ('BASE TABLE', 'VIEW')
520 AND t.TABLE_SCHEMA NOT IN ('sys', 'INFORMATION_SCHEMA')
521 """
523 if self.schema: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 query += f" AND t.TABLE_SCHEMA = '{self.schema}'"
526 query += " GROUP BY t.TABLE_NAME, t.TABLE_SCHEMA, t.TABLE_TYPE, ep.value"
528 if table_names is not None and len(table_names) > 0:
529 quoted_names = [f"'{t}'" for t in table_names]
530 query += f" HAVING t.TABLE_NAME IN ({','.join(quoted_names)})"
532 result = self.native_query(query)
533 return result
535 def meta_get_columns(self, table_names: list[str] | None = None) -> Response:
536 """
537 Retrieves column metadata for the specified tables (or all tables if no list is provided).
539 Args:
540 table_names (list): A list of table names for which to retrieve column metadata.
542 Returns:
543 Response: A response object containing the column metadata.
544 """
545 query = f"""
546 SELECT
547 c.TABLE_NAME as table_name,
548 c.COLUMN_NAME as column_name,
549 c.DATA_TYPE as data_type,
550 CAST(ep.value AS NVARCHAR(MAX)) as column_description,
551 c.COLUMN_DEFAULT as column_default,
552 CASE WHEN c.IS_NULLABLE = 'YES' THEN 1 ELSE 0 END as is_nullable
553 FROM {self.database}.INFORMATION_SCHEMA.COLUMNS c
554 LEFT JOIN {self.database}.sys.tables st
555 ON c.TABLE_NAME = st.name
556 LEFT JOIN {self.database}.sys.schemas s
557 ON st.schema_id = s.schema_id AND c.TABLE_SCHEMA = s.name
558 LEFT JOIN {self.database}.sys.columns sc
559 ON st.object_id = sc.object_id AND c.COLUMN_NAME = sc.name
560 LEFT JOIN {self.database}.sys.extended_properties ep
561 ON st.object_id = ep.major_id
562 AND sc.column_id = ep.minor_id
563 AND ep.name = 'MS_Description'
564 WHERE c.TABLE_SCHEMA NOT IN ('sys', 'INFORMATION_SCHEMA')
565 """
567 if self.schema: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true
568 query += f" AND c.TABLE_SCHEMA = '{self.schema}'"
570 if table_names is not None and len(table_names) > 0:
571 quoted_names = [f"'{t}'" for t in table_names]
572 query += f" AND c.TABLE_NAME IN ({','.join(quoted_names)})"
574 result = self.native_query(query)
575 return result
577 def meta_get_column_statistics(self, table_names: list[str] | None = None) -> Response:
578 """
579 Retrieves column statistics (e.g., null percentage, distinct value count, min/max values)
580 for the specified tables or all tables if no list is provided.
582 Note: Uses SQL Server's sys.dm_db_stats_properties and sys.dm_db_stats_histogram
583 (similar to PostgreSQL's pg_stats). Statistics are only available for columns that
584 have statistics objects created by SQL Server (typically indexed columns or columns
585 used in queries after AUTO_CREATE_STATISTICS).
587 Args:
588 table_names (list): A list of table names for which to retrieve column statistics.
590 Returns:
591 Response: A response object containing the column statistics.
592 """
593 table_filter = ""
594 if table_names is not None and len(table_names) > 0:
595 quoted_names = [f"'{t}'" for t in table_names]
596 table_filter = f" AND t.name IN ({','.join(quoted_names)})"
598 schema_filter = ""
599 if self.schema: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true
600 schema_filter = f" AND s.name = '{self.schema}'"
602 # Using OUTER APPLY to handle table-valued functions properly
603 # This is equivalent to PostgreSQL's pg_stats view approach
604 # Includes all statistics: auto-created, user-created, and index-based
605 # dm_db_stats_histogram columns: range_high_key, range_rows, equal_rows,
606 # distinct_range_rows, average_range_rows
607 query = f"""
608 SELECT DISTINCT
609 t.name AS TABLE_NAME,
610 c.name AS COLUMN_NAME,
611 CAST(NULL AS DECIMAL(10,2)) AS NULL_PERCENTAGE,
612 CAST(h.distinct_count AS BIGINT) AS DISTINCT_VALUES_COUNT,
613 NULL AS MOST_COMMON_VALUES,
614 NULL AS MOST_COMMON_FREQUENCIES,
615 CAST(h.min_value AS NVARCHAR(MAX)) AS MINIMUM_VALUE,
616 CAST(h.max_value AS NVARCHAR(MAX)) AS MAXIMUM_VALUE
617 FROM {self.database}.sys.tables t
618 INNER JOIN {self.database}.sys.schemas s
619 ON t.schema_id = s.schema_id
620 INNER JOIN {self.database}.sys.columns c
621 ON t.object_id = c.object_id
622 LEFT JOIN {self.database}.sys.stats st
623 ON st.object_id = t.object_id
624 LEFT JOIN {self.database}.sys.stats_columns sc
625 ON sc.object_id = st.object_id
626 AND sc.stats_id = st.stats_id
627 AND sc.column_id = c.column_id
628 AND sc.stats_column_id = 1 -- Only leading column in multi-column stats
629 OUTER APPLY (
630 SELECT
631 MIN(CAST(range_high_key AS NVARCHAR(MAX))) AS min_value,
632 MAX(CAST(range_high_key AS NVARCHAR(MAX))) AS max_value,
633 SUM(CAST(distinct_range_rows AS BIGINT)) + COUNT(*) AS distinct_count
634 FROM {self.database}.sys.dm_db_stats_histogram(st.object_id, st.stats_id)
635 WHERE st.object_id IS NOT NULL
636 ) h
637 WHERE s.name NOT IN ('sys', 'INFORMATION_SCHEMA')
638 {schema_filter}
639 {table_filter}
640 ORDER BY t.name, c.name
641 """
643 result = self.native_query(query)
644 return result
646 def meta_get_primary_keys(self, table_names: list[str] | None = None) -> Response:
647 """
648 Retrieves primary key information for the specified tables (or all tables if no list is provided).
650 Args:
651 table_names (list): A list of table names for which to retrieve primary key information.
653 Returns:
654 Response: A response object containing the primary key information.
655 """
656 query = f"""
657 SELECT
658 tc.TABLE_NAME as table_name,
659 kcu.COLUMN_NAME as column_name,
660 kcu.ORDINAL_POSITION as ordinal_position,
661 tc.CONSTRAINT_NAME as constraint_name
662 FROM {self.database}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
663 INNER JOIN {self.database}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
664 ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
665 AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
666 AND tc.TABLE_NAME = kcu.TABLE_NAME
667 WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
668 """
670 if self.schema: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true
671 query += f" AND tc.TABLE_SCHEMA = '{self.schema}'"
673 if table_names is not None and len(table_names) > 0:
674 quoted_names = [f"'{t}'" for t in table_names]
675 query += f" AND tc.TABLE_NAME IN ({','.join(quoted_names)})"
677 query += " ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION"
679 result = self.native_query(query)
680 return result
682 def meta_get_foreign_keys(self, table_names: list[str] | None = None) -> Response:
683 """
684 Retrieves foreign key information for the specified tables (or all tables if no list is provided).
686 Args:
687 table_names (list): A list of table names for which to retrieve foreign key information.
689 Returns:
690 Response: A response object containing the foreign key information.
691 """
692 query = f"""
693 SELECT
694 OBJECT_NAME(fk.referenced_object_id) as parent_table_name,
695 COL_NAME(fkc.referenced_object_id, fkc.referenced_column_id) as parent_column_name,
696 OBJECT_NAME(fk.parent_object_id) as child_table_name,
697 COL_NAME(fkc.parent_object_id, fkc.parent_column_id) as child_column_name,
698 fk.name as constraint_name
699 FROM {self.database}.sys.foreign_keys fk
700 INNER JOIN {self.database}.sys.foreign_key_columns fkc
701 ON fk.object_id = fkc.constraint_object_id
702 INNER JOIN {self.database}.sys.tables t
703 ON fk.parent_object_id = t.object_id
704 INNER JOIN {self.database}.sys.schemas s
705 ON t.schema_id = s.schema_id
706 WHERE s.name NOT IN ('sys', 'INFORMATION_SCHEMA')
707 """
709 if self.schema: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true
710 query += f" AND s.name = '{self.schema}'"
712 if table_names is not None and len(table_names) > 0:
713 quoted_names = [f"'{t}'" for t in table_names]
714 query += f" AND OBJECT_NAME(fk.parent_object_id) IN ({','.join(quoted_names)})"
716 query += " ORDER BY child_table_name, constraint_name"
718 result = self.native_query(query)
719 return result