Coverage for mindsdb / integrations / handlers / mysql_handler / mysql_handler.py: 92%
202 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional, List, Dict, Any
3import pandas as pd
4import mysql.connector
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb_sql_parser.ast.base import ASTNode
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.base import MetaDatabaseHandler
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE,
16)
17from mindsdb.integrations.handlers.mysql_handler.settings import ConnectionConfig
18from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE
19from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import C_TYPES, DATA_C_TYPE_MAP
21logger = log.getLogger(__name__)
24def _map_type(mysql_type_text: str) -> MYSQL_DATA_TYPE:
25 """Map MySQL text types names to MySQL types as enum.
27 Args:
28 mysql_type_text (str): The name of the MySQL type to map.
30 Returns:
31 MYSQL_DATA_TYPE: The MySQL type enum that corresponds to the MySQL text type name.
32 """
33 try:
34 return MYSQL_DATA_TYPE(mysql_type_text.upper())
35 except Exception:
36 logger.warning(f"MySQL handler: unknown type: {mysql_type_text}, use TEXT as fallback.")
37 return MYSQL_DATA_TYPE.TEXT
40def _make_table_response(result: List[Dict[str, Any]], cursor: mysql.connector.cursor.MySQLCursor) -> Response:
41 """Build response from result and cursor.
43 Args:
44 result (list[dict]): result of the query.
45 cursor (mysql.connector.cursor.MySQLCursor): cursor object.
47 Returns:
48 Response: response object.
49 """
50 description = cursor.description
51 reverse_c_type_map = {v.code: k for k, v in DATA_C_TYPE_MAP.items() if v.code != C_TYPES.MYSQL_TYPE_BLOB}
52 mysql_types: list[MYSQL_DATA_TYPE] = []
53 for col in description:
54 type_int = col[1]
55 if isinstance(type_int, int) is False:
56 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
57 continue
59 if type_int == C_TYPES.MYSQL_TYPE_TINY:
60 # There are 3 types that returns as TINYINT: TINYINT, BOOL, BOOLEAN.
61 mysql_types.append(MYSQL_DATA_TYPE.TINYINT)
62 continue
64 if type_int in reverse_c_type_map:
65 mysql_types.append(reverse_c_type_map[type_int])
66 continue
68 if type_int == C_TYPES.MYSQL_TYPE_BLOB: 68 ↛ 83line 68 didn't jump to line 83 because the condition on line 68 was always true
69 # region determine text/blob type by flags
70 # Unfortunately, there is no way to determine particular type of text/blob column by flags.
71 # Subtype have to be determined by 8-s element of description tuple, but mysql.conector
72 # return the same value for all text types (TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT), and for
73 # all blob types (TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB).
74 if col[7] == 16: # and col[8] == 45
75 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
76 elif col[7] == 144: # and col[8] == 63 76 ↛ 79line 76 didn't jump to line 79 because the condition on line 76 was always true
77 mysql_types.append(MYSQL_DATA_TYPE.BLOB)
78 else:
79 logger.debug(f"MySQL handler: unknown type code {col[7]}, use TEXT as fallback.")
80 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
81 # endregion
82 else:
83 logger.warning(f"MySQL handler: unknown type id={type_int} in column {col[0]}, use TEXT as fallback.")
84 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
86 # region cast int and bool to nullable types
87 serieses = []
88 for i, mysql_type in enumerate(mysql_types):
89 expected_dtype = None
90 column_name = description[i][0]
91 if mysql_type in (
92 MYSQL_DATA_TYPE.SMALLINT,
93 MYSQL_DATA_TYPE.INT,
94 MYSQL_DATA_TYPE.MEDIUMINT,
95 MYSQL_DATA_TYPE.BIGINT,
96 MYSQL_DATA_TYPE.TINYINT,
97 ):
98 expected_dtype = "Int64"
99 elif mysql_type in (MYSQL_DATA_TYPE.BOOL, MYSQL_DATA_TYPE.BOOLEAN): 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 expected_dtype = "boolean"
101 serieses.append(pd.Series([row[column_name] for row in result], dtype=expected_dtype, name=description[i][0]))
102 df = pd.concat(serieses, axis=1, copy=False)
103 # endregion
105 response = Response(RESPONSE_TYPE.TABLE, df, affected_rows=cursor.rowcount, mysql_types=mysql_types)
106 return response
109class MySQLHandler(MetaDatabaseHandler):
110 """
111 This handler handles connection and execution of the MySQL statements.
112 """
114 name = "mysql"
116 def __init__(self, name: str, **kwargs: Any) -> None:
117 super().__init__(name)
118 self.parser = parse_sql
119 self.dialect = "mysql"
120 self.connection_data = kwargs.get("connection_data", {})
121 self.database = self.connection_data.get("database")
123 self.connection: Optional[mysql.connector.MySQLConnection] = None
125 def __del__(self) -> None:
126 if self.is_connected:
127 self.disconnect()
129 def _unpack_config(self) -> Dict[str, Any]:
130 """
131 Unpacks the config from the connection_data by validation all parameters.
133 Returns:
134 dict: A dictionary containing the validated connection parameters.
135 """
136 try:
137 config = ConnectionConfig(**self.connection_data)
138 return config.model_dump(exclude_unset=True)
139 except ValueError as e:
140 raise ValueError(str(e))
142 @property
143 def is_connected(self) -> bool:
144 """
145 Checks if the handler is connected to the MySQL database.
147 Returns:
148 bool: True if the handler is connected, False otherwise.
149 """
150 return self.connection is not None and self.connection.is_connected()
152 @is_connected.setter
153 def is_connected(self, value: bool) -> None:
154 pass
156 def connect(self) -> mysql.connector.MySQLConnection:
157 """
158 Establishes a connection to a MySQL database.
160 Returns:
161 MySQLConnection: An active connection to the database.
162 """
163 if self.is_connected and self.connection.is_connected(): 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 return self.connection
165 config = self._unpack_config()
166 if "conn_attrs" in self.connection_data:
167 config["conn_attrs"] = self.connection_data["conn_attrs"]
169 if "connection_timeout" not in config:
170 config["connection_timeout"] = 10
172 ssl = self.connection_data.get("ssl")
173 if ssl is True:
174 ssl_ca = self.connection_data.get("ssl_ca")
175 ssl_cert = self.connection_data.get("ssl_cert")
176 ssl_key = self.connection_data.get("ssl_key")
177 config["client_flags"] = [mysql.connector.constants.ClientFlag.SSL]
178 if ssl_ca is not None: 178 ↛ 180line 178 didn't jump to line 180 because the condition on line 178 was always true
179 config["ssl_ca"] = ssl_ca
180 if ssl_cert is not None: 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true
181 config["ssl_cert"] = ssl_cert
182 if ssl_key is not None: 182 ↛ 187line 182 didn't jump to line 187 because the condition on line 182 was always true
183 config["ssl_key"] = ssl_key
184 elif ssl is False:
185 config["ssl_disabled"] = True
187 if "collation" not in config:
188 config["collation"] = "utf8mb4_general_ci"
189 if "use_pure" not in config:
190 config["use_pure"] = True
191 try:
192 connection = mysql.connector.connect(**config)
193 connection.autocommit = True
194 self.connection = connection
195 return self.connection
196 except mysql.connector.Error as e:
197 logger.error(f"Error connecting to MySQL {self.database}, {e}!")
198 raise
200 def disconnect(self) -> None:
201 """
202 Closes the connection to the MySQL database if it's currently open.
203 """
204 if self.is_connected is False:
205 return
206 self.connection.close()
207 return
209 def check_connection(self) -> StatusResponse:
210 """
211 Checks the status of the connection to the MySQL database.
213 Returns:
214 StatusResponse: An object containing the success status and an error message if an error occurs.
215 """
217 result = StatusResponse(False)
218 need_to_close = not self.is_connected
220 try:
221 connection = self.connect()
222 result.success = connection.is_connected()
223 except mysql.connector.Error as e:
224 logger.error(f"Error connecting to MySQL {self.connection_data.get('database', 'unknown')}! Error: {e}")
225 result.error_message = str(e)
227 if result.success and need_to_close:
228 self.disconnect()
230 return result
232 def native_query(self, query: str) -> Response:
233 """
234 Executes a SQL query on the MySQL database and returns the result.
236 Args:
237 query (str): The SQL query to be executed.
239 Returns:
240 Response: A response object containing the result of the query or an error message.
241 """
242 need_to_close = not self.is_connected
243 connection = None
244 try:
245 connection = self.connect()
246 with connection.cursor(dictionary=True, buffered=True) as cur:
247 cur.execute(query)
248 if cur.with_rows:
249 result = cur.fetchall()
250 response = _make_table_response(result, cur)
251 else:
252 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount)
253 except mysql.connector.Error as e:
254 logger.error(
255 f"Error running query: {query} on {self.connection_data.get('database', 'unknown')}! Error: {e}"
256 )
257 response = Response(RESPONSE_TYPE.ERROR, error_code=e.errno or 1, error_message=str(e))
258 if connection is not None and connection.is_connected(): 258 ↛ 261line 258 didn't jump to line 261 because the condition on line 258 was always true
259 connection.rollback()
261 if need_to_close: 261 ↛ 264line 261 didn't jump to line 264 because the condition on line 261 was always true
262 self.disconnect()
264 return response
266 def query(self, query: ASTNode) -> Response:
267 """
268 Retrieve the data from the SQL statement.
269 """
270 renderer = SqlalchemyRender("mysql")
271 query_str = renderer.get_string(query, with_failback=True)
272 return self.native_query(query_str)
274 def get_tables(self) -> Response:
275 """
276 Get a list with all of the tabels in MySQL selected database
277 """
278 sql = """
279 SELECT
280 TABLE_SCHEMA AS table_schema,
281 TABLE_NAME AS table_name,
282 TABLE_TYPE AS table_type
283 FROM
284 information_schema.TABLES
285 WHERE
286 TABLE_TYPE IN ('BASE TABLE', 'VIEW')
287 AND TABLE_SCHEMA = DATABASE()
288 ORDER BY 2
289 ;
290 """
291 result = self.native_query(sql)
292 return result
294 def get_columns(self, table_name: str) -> Response:
295 """
296 Show details about the table
297 """
298 q = f"""
299 select
300 COLUMN_NAME,
301 DATA_TYPE,
302 ORDINAL_POSITION,
303 COLUMN_DEFAULT,
304 IS_NULLABLE,
305 CHARACTER_MAXIMUM_LENGTH,
306 CHARACTER_OCTET_LENGTH,
307 NUMERIC_PRECISION,
308 NUMERIC_SCALE,
309 DATETIME_PRECISION,
310 CHARACTER_SET_NAME,
311 COLLATION_NAME
312 from
313 information_schema.columns
314 where
315 table_name = '{table_name}';
316 """
317 result = self.native_query(q)
318 result.to_columns_table_response(map_type_fn=_map_type)
319 return result
321 def meta_get_tables(self, table_names: Optional[List[str]] = None, include_row_count: bool = False) -> Response:
322 """
323 Retrieves metadata information about the tables in the MySQL database
324 to be stored in the data catalog.
326 Args:
327 table_names (list): A list of table names for which to retrieve metadata information.
328 include_row_count (bool): Include TABLE_ROWS statistics (can be expensive on large schemas).
330 Returns:
331 Response: A response object containing the metadata information.
332 """
333 row_count_select = """,\n t.TABLE_ROWS as row_count""" if include_row_count else ""
335 query = f"""
336 SELECT
337 t.TABLE_NAME as table_name,
338 t.TABLE_SCHEMA as table_schema,
339 t.TABLE_TYPE as table_type,
340 t.TABLE_COMMENT as table_description
341 {row_count_select}
342 FROM information_schema.TABLES t
343 WHERE t.TABLE_SCHEMA = DATABASE()
344 AND t.TABLE_TYPE IN ('BASE TABLE', 'VIEW')
345 """
347 if table_names is not None and len(table_names) > 0:
348 quoted_names = [f"'{t}'" for t in table_names]
349 query += f" AND t.TABLE_NAME IN ({','.join(quoted_names)})"
351 query += " ORDER BY t.TABLE_NAME"
353 result = self.native_query(query)
354 return result
356 def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response:
357 """
358 Retrieves column metadata for the specified tables (or all tables if no list is provided).
360 Args:
361 table_names (list): A list of table names for which to retrieve column metadata.
363 Returns:
364 Response: A response object containing the column metadata.
365 """
366 query = """
367 SELECT
368 c.TABLE_NAME as table_name,
369 c.COLUMN_NAME as column_name,
370 c.DATA_TYPE as data_type,
371 c.COLUMN_COMMENT as column_description,
372 c.COLUMN_DEFAULT as column_default,
373 CASE WHEN c.IS_NULLABLE = 'YES' THEN 1 ELSE 0 END as is_nullable
374 FROM information_schema.COLUMNS c
375 WHERE c.TABLE_SCHEMA = DATABASE()
376 """
378 if table_names is not None and len(table_names) > 0:
379 quoted_names = [f"'{t}'" for t in table_names]
380 query += f" AND c.TABLE_NAME IN ({','.join(quoted_names)})"
382 query += " ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION"
384 result = self.native_query(query)
385 return result
387 def meta_get_column_statistics(self, table_names: Optional[List[str]] = None) -> Response:
388 """
389 Retrieves column statistics for the specified tables (or all tables if no list is provided).
390 Uses MySQL 8.0+ metadata sources (INFORMATION_SCHEMA.COLUMN_STATISTICS and INFORMATION_SCHEMA.STATISTICS) not requiring table scans.
392 Args:
393 table_names (list): A list of table names for which to retrieve column statistics.
395 Returns:
396 Response: A response object containing the column statistics.
397 """
398 table_filter = ""
399 if table_names:
400 quoted = ",".join(f"'{t}'" for t in table_names)
401 table_filter = f" AND c.TABLE_NAME IN ({quoted})"
403 query = f"""
404 WITH cols AS (
405 SELECT c.TABLE_SCHEMA, c.TABLE_NAME, c.COLUMN_NAME, c.ORDINAL_POSITION
406 FROM information_schema.COLUMNS c
407 WHERE c.TABLE_SCHEMA = DATABASE()
408 {table_filter}
409 ),
410 hist AS (
411 SELECT
412 cs.SCHEMA_NAME AS TABLE_SCHEMA,
413 cs.TABLE_NAME,
414 cs.COLUMN_NAME,
415 cs.HISTOGRAM,
416 JSON_LENGTH(cs.HISTOGRAM, '$.buckets') AS buckets_len
417 FROM information_schema.COLUMN_STATISTICS cs
418 WHERE cs.SCHEMA_NAME = DATABASE()
419 ),
420 ndv AS (
421 SELECT
422 s.TABLE_SCHEMA,
423 s.TABLE_NAME,
424 s.COLUMN_NAME,
425 MAX(s.CARDINALITY) AS DISTINCT_VALUES_COUNT
426 FROM information_schema.STATISTICS s
427 WHERE s.TABLE_SCHEMA = DATABASE()
428 GROUP BY s.TABLE_SCHEMA, s.TABLE_NAME, s.COLUMN_NAME
429 )
430 SELECT
431 c.TABLE_NAME AS TABLE_NAME,
432 c.COLUMN_NAME AS COLUMN_NAME,
434 /* optional fields kept NULL for simplicity */
435 CAST(NULL AS JSON) AS MOST_COMMON_VALUES,
436 CAST(NULL AS JSON) AS MOST_COMMON_FREQUENCIES,
438 /* histogram "null-values" fraction -> percent */
439 CASE
440 WHEN h.HISTOGRAM IS NULL THEN NULL
441 ELSE ROUND(
442 CAST(JSON_UNQUOTE(JSON_EXTRACT(h.HISTOGRAM, '$."null-values"')) AS DECIMAL(10,6)) * 100,
443 2
444 )
445 END AS NULL_PERCENTAGE,
446 /* MIN: first bucket's point (singleton) or lower endpoint (equi-height) */
447 CASE
448 WHEN h.HISTOGRAM IS NULL THEN NULL
449 ELSE COALESCE(
450 JSON_UNQUOTE(JSON_EXTRACT(h.HISTOGRAM, '$.buckets[0].value')),
451 JSON_UNQUOTE(JSON_EXTRACT(h.HISTOGRAM, '$.buckets[0].endpoint[0]'))
452 )
453 END AS MINIMUM_VALUE,
455 /* MAX: last bucket's point (singleton) or upper endpoint (equi-height) */
456 CASE
457 WHEN h.HISTOGRAM IS NULL THEN NULL
458 ELSE COALESCE(
459 JSON_UNQUOTE(
460 JSON_EXTRACT(h.HISTOGRAM,
461 CONCAT('$.buckets[', GREATEST(h.buckets_len - 1, 0), '].value')
462 )
463 ),
464 JSON_UNQUOTE(
465 JSON_EXTRACT(h.HISTOGRAM,
466 CONCAT('$.buckets[', GREATEST(h.buckets_len - 1, 0), '].endpoint[1]')
467 )
468 ),
469 JSON_UNQUOTE(
470 JSON_EXTRACT(h.HISTOGRAM,
471 CONCAT('$.buckets[', GREATEST(h.buckets_len - 1, 0), '].endpoint[0]')
472 )
473 )
474 )
475 END AS MAXIMUM_VALUE,
476 n.DISTINCT_VALUES_COUNT
477 FROM cols c
478 LEFT JOIN hist h
479 ON h.TABLE_SCHEMA = c.TABLE_SCHEMA
480 AND h.TABLE_NAME = c.TABLE_NAME
481 AND h.COLUMN_NAME = c.COLUMN_NAME
482 LEFT JOIN ndv n
483 ON n.TABLE_SCHEMA = c.TABLE_SCHEMA
484 AND n.TABLE_NAME = c.TABLE_NAME
485 AND n.COLUMN_NAME = c.COLUMN_NAME
486 ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION;
487 """
488 return self.native_query(query)
490 def meta_get_primary_keys(self, table_names: Optional[List[str]] = None) -> Response:
491 """
492 Retrieves primary key information for the specified tables (or all tables if no list is provided).
494 Args:
495 table_names (list): A list of table names for which to retrieve primary key information.
497 Returns:
498 Response: A response object containing the primary key information.
499 """
500 query = """
501 SELECT
502 tc.TABLE_NAME as table_name,
503 kcu.COLUMN_NAME as column_name,
504 kcu.ORDINAL_POSITION as ordinal_position,
505 tc.CONSTRAINT_NAME as constraint_name
506 FROM information_schema.TABLE_CONSTRAINTS tc
507 INNER JOIN information_schema.KEY_COLUMN_USAGE kcu
508 ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
509 AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
510 AND tc.TABLE_NAME = kcu.TABLE_NAME
511 WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
512 AND tc.TABLE_SCHEMA = DATABASE()
513 """
515 if table_names is not None and len(table_names) > 0:
516 quoted_names = [f"'{t}'" for t in table_names]
517 query += f" AND tc.TABLE_NAME IN ({','.join(quoted_names)})"
519 query += " ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION"
521 result = self.native_query(query)
522 return result
524 def meta_get_foreign_keys(self, table_names: Optional[List[str]] = None) -> Response:
525 """
526 Retrieves foreign key information for the specified tables (or all tables if no list is provided).
528 Args:
529 table_names (list): A list of table names for which to retrieve foreign key information.
531 Returns:
532 Response: A response object containing the foreign key information.
533 """
534 query = """
535 SELECT
536 kcu.REFERENCED_TABLE_NAME as parent_table_name,
537 kcu.REFERENCED_COLUMN_NAME as parent_column_name,
538 kcu.TABLE_NAME as child_table_name,
539 kcu.COLUMN_NAME as child_column_name,
540 kcu.CONSTRAINT_NAME as constraint_name
541 FROM information_schema.KEY_COLUMN_USAGE kcu
542 WHERE kcu.TABLE_SCHEMA = DATABASE()
543 AND kcu.REFERENCED_TABLE_NAME IS NOT NULL
544 """
546 if table_names is not None and len(table_names) > 0:
547 quoted_names = [f"'{t}'" for t in table_names]
548 query += f" AND kcu.TABLE_NAME IN ({','.join(quoted_names)})"
550 query += " ORDER BY kcu.TABLE_NAME, kcu.CONSTRAINT_NAME"
552 result = self.native_query(query)
553 return result