Coverage for mindsdb / integrations / handlers / mssql_handler / mssql_handler.py: 83%

284 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Any, Union, TYPE_CHECKING 

2import datetime 

3 

4import pymssql 

5from pymssql import OperationalError 

6import pandas as pd 

7from pandas.api import types as pd_types 

8 

9from mindsdb_sql_parser import parse_sql 

10from mindsdb_sql_parser.ast.base import ASTNode 

11from mindsdb_sql_parser.ast import Identifier 

12 

13from mindsdb.integrations.libs.base import MetaDatabaseHandler 

14from mindsdb.integrations.utilities.query_traversal import query_traversal 

15from mindsdb.utilities import log 

16from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

17from mindsdb.integrations.libs.response import ( 

18 HandlerStatusResponse as StatusResponse, 

19 HandlerResponse as Response, 

20 RESPONSE_TYPE, 

21) 

22from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE 

23 

24if TYPE_CHECKING: 

25 import pyodbc 

26 

27logger = log.getLogger(__name__) 

28 

29 

30def _map_type(mssql_type_text: str) -> MYSQL_DATA_TYPE: 

31 """Map MSSQL text types names to MySQL types as enum. 

32 

33 Args: 

34 mssql_type_text (str): The name of the MSSQL type to map. 

35 

36 Returns: 

37 MYSQL_DATA_TYPE: The MySQL type enum that corresponds to the MSSQL text type name. 

38 """ 

39 internal_type_name = mssql_type_text.lower() 

40 types_map = { 

41 ("tinyint", "smallint", "int", "bigint"): MYSQL_DATA_TYPE.INT, 

42 ("bit",): MYSQL_DATA_TYPE.BOOL, 

43 ("money", "smallmoney", "float", "real"): MYSQL_DATA_TYPE.FLOAT, 

44 ("decimal", "numeric"): MYSQL_DATA_TYPE.DECIMAL, 

45 ("date",): MYSQL_DATA_TYPE.DATE, 

46 ("time",): MYSQL_DATA_TYPE.TIME, 

47 ("datetime2", "datetimeoffset", "datetime", "smalldatetime"): MYSQL_DATA_TYPE.DATETIME, 

48 ("varchar", "nvarchar"): MYSQL_DATA_TYPE.VARCHAR, 

49 ("char", "text", "nchar", "ntext"): MYSQL_DATA_TYPE.TEXT, 

50 ("binary", "varbinary", "image"): MYSQL_DATA_TYPE.BINARY, 

51 } 

52 

53 for db_types_list, mysql_data_type in types_map.items(): 

54 if internal_type_name in db_types_list: 

55 return mysql_data_type 

56 

57 logger.debug(f"MSSQL handler type mapping: unknown type: {internal_type_name}, use VARCHAR as fallback.") 

58 return MYSQL_DATA_TYPE.VARCHAR 

59 

60 

61def _make_table_response( 

62 result: list[Union[dict[str, Any], tuple]], cursor: Union[pymssql.Cursor, "pyodbc.Cursor"], use_odbc: bool = False 

63) -> Response: 

64 """Build response from result and cursor. 

65 

66 Args: 

67 result (list[Union[dict[str, Any], tuple]]): result of the query. 

68 cursor (Union[pymssql.Cursor, pyodbc.Cursor]): cursor object. 

69 use_odbc (bool): whether ODBC connection is being used. 

70 

71 Returns: 

72 Response: response object. 

73 """ 

74 description: list[tuple[Any]] = cursor.description 

75 mysql_types: list[MYSQL_DATA_TYPE] = [] 

76 columns = [x[0] for x in cursor.description] 

77 

78 if not result: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 data_frame = pd.DataFrame(columns=columns) 

80 elif use_odbc: 

81 # from_records() understands tuple-like records (including pyodbc.Row) 

82 data_frame = pd.DataFrame.from_records(result, columns=columns) 

83 else: 

84 # pymssql with as_dict=True returns list of dicts 

85 data_frame = pd.DataFrame(result) 

86 

87 for column in description: 

88 column_name = column[0] 

89 column_type = column[1] 

90 column_dtype = data_frame[column_name].dtype 

91 

92 if use_odbc: 

93 # For pyodbc, use type inference based on pandas dtype 

94 if pd_types.is_integer_dtype(column_dtype): 

95 mysql_types.append(MYSQL_DATA_TYPE.INT) 

96 elif pd_types.is_float_dtype(column_dtype): 

97 mysql_types.append(MYSQL_DATA_TYPE.FLOAT) 

98 elif pd_types.is_bool_dtype(column_dtype): 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 mysql_types.append(MYSQL_DATA_TYPE.TINYINT) 

100 elif pd_types.is_datetime64_any_dtype(column_dtype): 

101 mysql_types.append(MYSQL_DATA_TYPE.DATETIME) 

102 elif pd_types.is_object_dtype(column_dtype): 102 ↛ 110line 102 didn't jump to line 110 because the condition on line 102 was always true

103 if len(data_frame) > 0 and isinstance( 103 ↛ 106line 103 didn't jump to line 106 because the condition on line 103 was never true

104 data_frame[column_name].iloc[0], (datetime.datetime, datetime.date, datetime.time) 

105 ): 

106 mysql_types.append(MYSQL_DATA_TYPE.DATETIME) 

107 else: 

108 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

109 else: 

110 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

111 else: 

112 match column_type: 

113 case pymssql.NUMBER: 

114 if pd_types.is_integer_dtype(column_dtype): 

115 mysql_types.append(MYSQL_DATA_TYPE.INT) 

116 elif pd_types.is_float_dtype(column_dtype): 

117 mysql_types.append(MYSQL_DATA_TYPE.FLOAT) 

118 elif pd_types.is_bool_dtype(column_dtype): 118 ↛ 121line 118 didn't jump to line 121 because the condition on line 118 was always true

119 mysql_types.append(MYSQL_DATA_TYPE.TINYINT) 

120 else: 

121 mysql_types.append(MYSQL_DATA_TYPE.DOUBLE) 

122 case pymssql.DECIMAL: 

123 mysql_types.append(MYSQL_DATA_TYPE.DECIMAL) 

124 case pymssql.STRING: 

125 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

126 case pymssql.DATETIME: 

127 mysql_types.append(MYSQL_DATA_TYPE.DATETIME) 

128 case pymssql.BINARY: 

129 # DATE and TIME types returned as 'BINARY' type, and dataframe type is 'object', so it is not possible 

130 # to infer correct mysql type for them 

131 if pd_types.is_datetime64_any_dtype(column_dtype): 

132 # pymssql return datetimes as 'binary' type 

133 # if timezone is present, then it is datetime.timezone 

134 series = data_frame[column_name] 

135 if ( 

136 series.dt.tz is not None 

137 and isinstance(series.dt.tz, datetime.timezone) 

138 and series.dt.tz != datetime.timezone.utc 

139 ): 

140 series = series.dt.tz_convert("UTC") 

141 data_frame[column_name] = series.dt.tz_localize(None) 

142 mysql_types.append(MYSQL_DATA_TYPE.DATETIME) 

143 else: 

144 mysql_types.append(MYSQL_DATA_TYPE.BINARY) 

145 case _: 

146 logger.warning(f"Unknown type: {column_type}, use TEXT as fallback.") 

147 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

148 

149 return Response(RESPONSE_TYPE.TABLE, data_frame=data_frame, mysql_types=mysql_types) 

150 

151 

152class SqlServerHandler(MetaDatabaseHandler): 

153 """ 

154 This handler handles connection and execution of the Microsoft SQL Server statements. 

155 Supports both native pymssql connections and ODBC connections via pyodbc. 

156 

157 To use ODBC connection, specify either: 

158 - 'use_odbc': True in connection parameters, or 

159 - 'driver': '<ODBC driver name>' in connection parameters 

160 """ 

161 

162 name = "mssql" 

163 

164 def __init__(self, name, **kwargs): 

165 super().__init__(name) 

166 self.parser = parse_sql 

167 self.connection_args = kwargs.get("connection_data") 

168 self.dialect = "mssql" 

169 self.database = self.connection_args.get("database") 

170 self.schema = self.connection_args.get("schema") 

171 self.renderer = SqlalchemyRender("mssql") 

172 

173 # Determine if ODBC should be used 

174 self.use_odbc = self.connection_args.get("use_odbc", False) or "driver" in self.connection_args 

175 

176 self.connection = None 

177 self.is_connected = False 

178 

179 def __del__(self): 

180 if self.is_connected is True: 

181 self.disconnect() 

182 

183 def connect(self): 

184 """ 

185 Establishes a connection to a Microsoft SQL Server database. 

186 Uses either pymssql (native) or pyodbc based on configuration. 

187 

188 Raises: 

189 pymssql._mssql.OperationalError or pyodbc.Error: If an error occurs while connecting to the database. 

190 

191 Returns: 

192 Union[pymssql.Connection, pyodbc.Connection]: A connection object to the Microsoft SQL Server database. 

193 """ 

194 

195 if self.is_connected is True: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 return self.connection 

197 

198 if self.use_odbc: 

199 return self._connect_odbc() 

200 else: 

201 return self._connect_pymssql() 

202 

203 def _connect_pymssql(self): 

204 """Connect using pymssql (native FreeTDS-based connection).""" 

205 # Mandatory connection parameters 

206 if not all(key in self.connection_args for key in ["host", "user", "password", "database"]): 

207 raise ValueError("Required parameters (host, user, password, database) must be provided.") 

208 

209 config = { 

210 "host": self.connection_args.get("host"), 

211 "user": self.connection_args.get("user"), 

212 "password": self.connection_args.get("password"), 

213 "database": self.connection_args.get("database"), 

214 } 

215 

216 # Optional connection parameters 

217 if "port" in self.connection_args: 217 ↛ 220line 217 didn't jump to line 220 because the condition on line 217 was always true

218 config["port"] = self.connection_args.get("port") 

219 

220 if "server" in self.connection_args: 

221 config["server"] = self.connection_args.get("server") 

222 

223 try: 

224 self.connection = pymssql.connect(**config) 

225 self.is_connected = True 

226 return self.connection 

227 except OperationalError as e: 

228 logger.error(f"Error connecting to Microsoft SQL Server {self.database}, {e}!") 

229 self.is_connected = False 

230 raise 

231 

232 def _connect_odbc(self): 

233 """Connect using pyodbc (ODBC connection).""" 

234 try: 

235 import pyodbc 

236 except ImportError as e: 

237 raise ImportError( 

238 "pyodbc is not installed. Install it with 'pip install pyodbc' or " 

239 "'pip install mindsdb[mssql-odbc]' to use ODBC connections." 

240 ) from e 

241 

242 # Mandatory connection parameters 

243 if not all(key in self.connection_args for key in ["host", "user", "password", "database"]): 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true

244 raise ValueError("Required parameters (host, user, password, database) must be provided.") 

245 

246 driver = self.connection_args.get("driver", "ODBC Driver 18 for SQL Server") 

247 host = self.connection_args.get("host") 

248 port = self.connection_args.get("port", 1433) 

249 database = self.connection_args.get("database") 

250 user = self.connection_args.get("user") 

251 password = self.connection_args.get("password") 

252 

253 conn_str_parts = [ 

254 f"DRIVER={{{driver}}}", 

255 f"SERVER={host},{port}", 

256 f"DATABASE={database}", 

257 f"UID={user}", 

258 f"PWD={password}", 

259 ] 

260 

261 # Add optional parameters 

262 if "encrypt" in self.connection_args: 

263 conn_str_parts.append(f"Encrypt={self.connection_args.get('encrypt', 'yes')}") 

264 if "trust_server_certificate" in self.connection_args: 

265 conn_str_parts.append( 

266 f"TrustServerCertificate={self.connection_args.get('trust_server_certificate', 'yes')}" 

267 ) 

268 

269 if "connection_string_args" in self.connection_args: 

270 conn_str_parts.append(self.connection_args["connection_string_args"]) 

271 

272 conn_str = ";".join(conn_str_parts) 

273 

274 try: 

275 self.connection = pyodbc.connect(conn_str, timeout=10) 

276 self.is_connected = True 

277 return self.connection 

278 except pyodbc.Error as e: 

279 logger.error(f"Error connecting to Microsoft SQL Server {self.database} via ODBC, {e}!") 

280 self.is_connected = False 

281 

282 # Check if it's a driver not found error 

283 error_msg = str(e) 

284 if "Driver" in error_msg and ("not found" in error_msg or "specified" in error_msg): 284 ↛ 290line 284 didn't jump to line 290 because the condition on line 284 was always true

285 raise ConnectionError( 

286 f"ODBC Driver not found: {driver}. " 

287 f"Please install the Microsoft ODBC Driver for SQL Server. " 

288 f"Error: {e}" 

289 ) from e 

290 raise 

291 except Exception as e: 

292 logger.error(f"Error connecting to Microsoft SQL Server {self.database} via ODBC, {e}!") 

293 self.is_connected = False 

294 raise 

295 

296 def disconnect(self): 

297 """ 

298 Closes the connection to the Microsoft SQL Server database if it's currently open. 

299 """ 

300 

301 if not self.is_connected: 

302 return 

303 if self.connection is not None: 

304 try: 

305 self.connection.close() 

306 except Exception: 

307 logger.exception("Failed to close connection:") 

308 pass 

309 self.connection = None 

310 self.is_connected = False 

311 

312 def check_connection(self) -> StatusResponse: 

313 """ 

314 Checks the status of the connection to the Microsoft SQL Server database. 

315 

316 Returns: 

317 StatusResponse: An object containing the success status and an error message if an error occurs. 

318 """ 

319 

320 response = StatusResponse(False) 

321 need_to_close = self.is_connected is False 

322 

323 try: 

324 connection = self.connect() 

325 with connection.cursor() as cur: 

326 # Execute a simple query to test the connection 

327 cur.execute("select 1;") 

328 response.success = True 

329 except OperationalError as e: 

330 logger.error(f"Error connecting to Microsoft SQL Server {self.database}, {e}!") 

331 response.error_message = str(e) 

332 

333 if response.success and need_to_close: 

334 self.disconnect() 

335 elif not response.success and self.is_connected: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 self.is_connected = False 

337 

338 return response 

339 

340 def native_query(self, query: str) -> Response: 

341 """ 

342 Executes a SQL query on the Microsoft SQL Server database and returns the result. 

343 

344 Args: 

345 query (str): The SQL query to be executed. 

346 

347 Returns: 

348 Response: A response object containing the result of the query or an error message. 

349 """ 

350 

351 need_to_close = self.is_connected is False 

352 

353 connection = self.connect() 

354 

355 if self.use_odbc: 

356 with connection.cursor() as cur: 

357 try: 

358 cur.execute(query) 

359 if cur.description: 359 ↛ 363line 359 didn't jump to line 363 because the condition on line 359 was always true

360 result = cur.fetchall() 

361 response = _make_table_response(result, cur, use_odbc=True) 

362 else: 

363 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount) 

364 connection.commit() 

365 except Exception as e: 

366 logger.exception(f"Error running query: {query} on {self.database}, {e}!") 

367 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e)) 

368 connection.rollback() 

369 else: 

370 with connection.cursor(as_dict=True) as cur: 

371 try: 

372 cur.execute(query) 

373 if cur.description: 

374 result = cur.fetchall() 

375 response = _make_table_response(result, cur, use_odbc=False) 

376 else: 

377 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount) 

378 connection.commit() 

379 except Exception as e: 

380 logger.exception(f"Error running query: {query} on {self.database}, {e}!") 

381 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e)) 

382 connection.rollback() 

383 

384 if need_to_close is True: 

385 self.disconnect() 

386 

387 return response 

388 

389 def _add_schema_to_tables(self, node, is_table=False, **kwargs): 

390 """ 

391 Callback for query_traversal that adds schema prefix to table identifiers. 

392 

393 Args: 

394 node: The AST node being visited 

395 is_table: True if this node represents a table reference 

396 **kwargs: Other arguments from query_traversal (parent_query, callstack, etc.) 

397 

398 Returns: 

399 None to keep traversing, or a replacement node 

400 Note: This is mostly a workaround for Minds but it should still work for FQE 

401 """ 

402 if is_table and isinstance(node, Identifier): 

403 # Only add schema if the identifier doesn't already have one (single part) 

404 if len(node.parts) == 1: 

405 node.parts.insert(0, self.schema) 

406 node.is_quoted.insert(0, False) 

407 return None 

408 

409 def query(self, query: ASTNode) -> Response: 

410 """ 

411 Executes a SQL query represented by an ASTNode and retrieves the data. 

412 

413 Args: 

414 query (ASTNode): An ASTNode representing the SQL query to be executed. 

415 

416 Returns: 

417 Response: The response from the `native_query` method, containing the result of the SQL query execution. 

418 """ 

419 # Add schema prefix to table identifiers if schema is configured 

420 if self.schema: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 query_traversal(query, self._add_schema_to_tables) 

422 

423 query_str = self.renderer.get_string(query, with_failback=True) 

424 logger.debug(f"Executing SQL query: {query_str}") 

425 return self.native_query(query_str) 

426 

427 def get_tables(self) -> Response: 

428 """ 

429 Retrieves a list of all non-system tables and views in the current schema of the Microsoft SQL Server database. 

430 

431 Returns: 

432 Response: A response object containing the list of tables and views, formatted as per the `Response` class. 

433 """ 

434 

435 query = f""" 

436 SELECT 

437 table_schema, 

438 table_name, 

439 table_type 

440 FROM {self.database}.INFORMATION_SCHEMA.TABLES 

441 WHERE TABLE_TYPE in ('BASE TABLE', 'VIEW') 

442 """ 

443 if self.schema: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 query += f" AND table_schema = '{self.schema}'" 

445 

446 return self.native_query(query) 

447 

448 def get_columns(self, table_name) -> Response: 

449 """ 

450 Retrieves column details for a specified table in the Microsoft SQL Server database. 

451 

452 Args: 

453 table_name (str): The name of the table for which to retrieve column information. 

454 

455 Returns: 

456 Response: A response object containing the column details, formatted as per the `Response` class. 

457 Raises: 

458 ValueError: If the 'table_name' is not a valid string. 

459 """ 

460 

461 query = f""" 

462 SELECT 

463 COLUMN_NAME, 

464 DATA_TYPE, 

465 ORDINAL_POSITION, 

466 COLUMN_DEFAULT, 

467 IS_NULLABLE, 

468 CHARACTER_MAXIMUM_LENGTH, 

469 CHARACTER_OCTET_LENGTH, 

470 NUMERIC_PRECISION, 

471 NUMERIC_SCALE, 

472 DATETIME_PRECISION, 

473 CHARACTER_SET_NAME, 

474 COLLATION_NAME 

475 FROM 

476 information_schema.columns 

477 WHERE 

478 table_name = '{table_name}' 

479 """ 

480 

481 if self.schema: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 query += f" AND table_schema = '{self.schema}'" 

483 

484 result = self.native_query(query) 

485 result.to_columns_table_response(map_type_fn=_map_type) 

486 return result 

487 

488 def meta_get_tables(self, table_names: list[str] | None = None) -> Response: 

489 """ 

490 Retrieves metadata information about the tables in the Microsoft SQL Server database 

491 to be stored in the data catalog. 

492 

493 Args: 

494 table_names (list): A list of table names for which to retrieve metadata information. 

495 

496 Returns: 

497 Response: A response object containing the metadata information, formatted as per the `Response` class. 

498 """ 

499 query = f""" 

500 SELECT  

501 t.TABLE_NAME as table_name, 

502 t.TABLE_SCHEMA as table_schema, 

503 t.TABLE_TYPE as table_type, 

504 CAST(ep.value AS NVARCHAR(MAX)) as table_description, 

505 SUM(p.rows) as row_count 

506 FROM {self.database}.INFORMATION_SCHEMA.TABLES t 

507 LEFT JOIN {self.database}.sys.tables st  

508 ON t.TABLE_NAME = st.name 

509 LEFT JOIN {self.database}.sys.schemas s  

510 ON st.schema_id = s.schema_id AND t.TABLE_SCHEMA = s.name 

511 LEFT JOIN {self.database}.sys.extended_properties ep  

512 ON st.object_id = ep.major_id  

513 AND ep.minor_id = 0  

514 AND ep.class = 1 

515 AND ep.name = 'MS_Description' 

516 LEFT JOIN {self.database}.sys.partitions p  

517 ON st.object_id = p.object_id  

518 AND p.index_id IN (0, 1) 

519 WHERE t.TABLE_TYPE IN ('BASE TABLE', 'VIEW') 

520 AND t.TABLE_SCHEMA NOT IN ('sys', 'INFORMATION_SCHEMA') 

521 """ 

522 

523 if self.schema: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 query += f" AND t.TABLE_SCHEMA = '{self.schema}'" 

525 

526 query += " GROUP BY t.TABLE_NAME, t.TABLE_SCHEMA, t.TABLE_TYPE, ep.value" 

527 

528 if table_names is not None and len(table_names) > 0: 

529 quoted_names = [f"'{t}'" for t in table_names] 

530 query += f" HAVING t.TABLE_NAME IN ({','.join(quoted_names)})" 

531 

532 result = self.native_query(query) 

533 return result 

534 

535 def meta_get_columns(self, table_names: list[str] | None = None) -> Response: 

536 """ 

537 Retrieves column metadata for the specified tables (or all tables if no list is provided). 

538 

539 Args: 

540 table_names (list): A list of table names for which to retrieve column metadata. 

541 

542 Returns: 

543 Response: A response object containing the column metadata. 

544 """ 

545 query = f""" 

546 SELECT  

547 c.TABLE_NAME as table_name, 

548 c.COLUMN_NAME as column_name, 

549 c.DATA_TYPE as data_type, 

550 CAST(ep.value AS NVARCHAR(MAX)) as column_description, 

551 c.COLUMN_DEFAULT as column_default, 

552 CASE WHEN c.IS_NULLABLE = 'YES' THEN 1 ELSE 0 END as is_nullable 

553 FROM {self.database}.INFORMATION_SCHEMA.COLUMNS c 

554 LEFT JOIN {self.database}.sys.tables st  

555 ON c.TABLE_NAME = st.name 

556 LEFT JOIN {self.database}.sys.schemas s  

557 ON st.schema_id = s.schema_id AND c.TABLE_SCHEMA = s.name 

558 LEFT JOIN {self.database}.sys.columns sc  

559 ON st.object_id = sc.object_id AND c.COLUMN_NAME = sc.name 

560 LEFT JOIN {self.database}.sys.extended_properties ep  

561 ON st.object_id = ep.major_id  

562 AND sc.column_id = ep.minor_id  

563 AND ep.name = 'MS_Description' 

564 WHERE c.TABLE_SCHEMA NOT IN ('sys', 'INFORMATION_SCHEMA') 

565 """ 

566 

567 if self.schema: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true

568 query += f" AND c.TABLE_SCHEMA = '{self.schema}'" 

569 

570 if table_names is not None and len(table_names) > 0: 

571 quoted_names = [f"'{t}'" for t in table_names] 

572 query += f" AND c.TABLE_NAME IN ({','.join(quoted_names)})" 

573 

574 result = self.native_query(query) 

575 return result 

576 

577 def meta_get_column_statistics(self, table_names: list[str] | None = None) -> Response: 

578 """ 

579 Retrieves column statistics (e.g., null percentage, distinct value count, min/max values) 

580 for the specified tables or all tables if no list is provided. 

581 

582 Note: Uses SQL Server's sys.dm_db_stats_properties and sys.dm_db_stats_histogram 

583 (similar to PostgreSQL's pg_stats). Statistics are only available for columns that 

584 have statistics objects created by SQL Server (typically indexed columns or columns 

585 used in queries after AUTO_CREATE_STATISTICS). 

586 

587 Args: 

588 table_names (list): A list of table names for which to retrieve column statistics. 

589 

590 Returns: 

591 Response: A response object containing the column statistics. 

592 """ 

593 table_filter = "" 

594 if table_names is not None and len(table_names) > 0: 

595 quoted_names = [f"'{t}'" for t in table_names] 

596 table_filter = f" AND t.name IN ({','.join(quoted_names)})" 

597 

598 schema_filter = "" 

599 if self.schema: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true

600 schema_filter = f" AND s.name = '{self.schema}'" 

601 

602 # Using OUTER APPLY to handle table-valued functions properly 

603 # This is equivalent to PostgreSQL's pg_stats view approach 

604 # Includes all statistics: auto-created, user-created, and index-based 

605 # dm_db_stats_histogram columns: range_high_key, range_rows, equal_rows, 

606 # distinct_range_rows, average_range_rows 

607 query = f""" 

608 SELECT DISTINCT 

609 t.name AS TABLE_NAME, 

610 c.name AS COLUMN_NAME, 

611 CAST(NULL AS DECIMAL(10,2)) AS NULL_PERCENTAGE, 

612 CAST(h.distinct_count AS BIGINT) AS DISTINCT_VALUES_COUNT, 

613 NULL AS MOST_COMMON_VALUES, 

614 NULL AS MOST_COMMON_FREQUENCIES, 

615 CAST(h.min_value AS NVARCHAR(MAX)) AS MINIMUM_VALUE, 

616 CAST(h.max_value AS NVARCHAR(MAX)) AS MAXIMUM_VALUE 

617 FROM {self.database}.sys.tables t 

618 INNER JOIN {self.database}.sys.schemas s  

619 ON t.schema_id = s.schema_id 

620 INNER JOIN {self.database}.sys.columns c  

621 ON t.object_id = c.object_id 

622 LEFT JOIN {self.database}.sys.stats st 

623 ON st.object_id = t.object_id 

624 LEFT JOIN {self.database}.sys.stats_columns sc 

625 ON sc.object_id = st.object_id  

626 AND sc.stats_id = st.stats_id 

627 AND sc.column_id = c.column_id 

628 AND sc.stats_column_id = 1 -- Only leading column in multi-column stats 

629 OUTER APPLY ( 

630 SELECT  

631 MIN(CAST(range_high_key AS NVARCHAR(MAX))) AS min_value, 

632 MAX(CAST(range_high_key AS NVARCHAR(MAX))) AS max_value, 

633 SUM(CAST(distinct_range_rows AS BIGINT)) + COUNT(*) AS distinct_count 

634 FROM {self.database}.sys.dm_db_stats_histogram(st.object_id, st.stats_id) 

635 WHERE st.object_id IS NOT NULL 

636 ) h 

637 WHERE s.name NOT IN ('sys', 'INFORMATION_SCHEMA') 

638 {schema_filter} 

639 {table_filter} 

640 ORDER BY t.name, c.name 

641 """ 

642 

643 result = self.native_query(query) 

644 return result 

645 

646 def meta_get_primary_keys(self, table_names: list[str] | None = None) -> Response: 

647 """ 

648 Retrieves primary key information for the specified tables (or all tables if no list is provided). 

649 

650 Args: 

651 table_names (list): A list of table names for which to retrieve primary key information. 

652 

653 Returns: 

654 Response: A response object containing the primary key information. 

655 """ 

656 query = f""" 

657 SELECT  

658 tc.TABLE_NAME as table_name, 

659 kcu.COLUMN_NAME as column_name, 

660 kcu.ORDINAL_POSITION as ordinal_position, 

661 tc.CONSTRAINT_NAME as constraint_name 

662 FROM {self.database}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc 

663 INNER JOIN {self.database}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu 

664 ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME 

665 AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA 

666 AND tc.TABLE_NAME = kcu.TABLE_NAME 

667 WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY' 

668 """ 

669 

670 if self.schema: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true

671 query += f" AND tc.TABLE_SCHEMA = '{self.schema}'" 

672 

673 if table_names is not None and len(table_names) > 0: 

674 quoted_names = [f"'{t}'" for t in table_names] 

675 query += f" AND tc.TABLE_NAME IN ({','.join(quoted_names)})" 

676 

677 query += " ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION" 

678 

679 result = self.native_query(query) 

680 return result 

681 

682 def meta_get_foreign_keys(self, table_names: list[str] | None = None) -> Response: 

683 """ 

684 Retrieves foreign key information for the specified tables (or all tables if no list is provided). 

685 

686 Args: 

687 table_names (list): A list of table names for which to retrieve foreign key information. 

688 

689 Returns: 

690 Response: A response object containing the foreign key information. 

691 """ 

692 query = f""" 

693 SELECT  

694 OBJECT_NAME(fk.referenced_object_id) as parent_table_name, 

695 COL_NAME(fkc.referenced_object_id, fkc.referenced_column_id) as parent_column_name, 

696 OBJECT_NAME(fk.parent_object_id) as child_table_name, 

697 COL_NAME(fkc.parent_object_id, fkc.parent_column_id) as child_column_name, 

698 fk.name as constraint_name 

699 FROM {self.database}.sys.foreign_keys fk 

700 INNER JOIN {self.database}.sys.foreign_key_columns fkc  

701 ON fk.object_id = fkc.constraint_object_id 

702 INNER JOIN {self.database}.sys.tables t  

703 ON fk.parent_object_id = t.object_id 

704 INNER JOIN {self.database}.sys.schemas s  

705 ON t.schema_id = s.schema_id 

706 WHERE s.name NOT IN ('sys', 'INFORMATION_SCHEMA') 

707 """ 

708 

709 if self.schema: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true

710 query += f" AND s.name = '{self.schema}'" 

711 

712 if table_names is not None and len(table_names) > 0: 

713 quoted_names = [f"'{t}'" for t in table_names] 

714 query += f" AND OBJECT_NAME(fk.parent_object_id) IN ({','.join(quoted_names)})" 

715 

716 query += " ORDER BY child_table_name, constraint_name" 

717 

718 result = self.native_query(query) 

719 return result