Coverage for mindsdb / integrations / handlers / mysql_handler / mysql_handler.py: 92%

202 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional, List, Dict, Any 

2 

3import pandas as pd 

4import mysql.connector 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb_sql_parser.ast.base import ASTNode 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.base import MetaDatabaseHandler 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE, 

16) 

17from mindsdb.integrations.handlers.mysql_handler.settings import ConnectionConfig 

18from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE 

19from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import C_TYPES, DATA_C_TYPE_MAP 

20 

21logger = log.getLogger(__name__) 

22 

23 

24def _map_type(mysql_type_text: str) -> MYSQL_DATA_TYPE: 

25 """Map MySQL text types names to MySQL types as enum. 

26 

27 Args: 

28 mysql_type_text (str): The name of the MySQL type to map. 

29 

30 Returns: 

31 MYSQL_DATA_TYPE: The MySQL type enum that corresponds to the MySQL text type name. 

32 """ 

33 try: 

34 return MYSQL_DATA_TYPE(mysql_type_text.upper()) 

35 except Exception: 

36 logger.warning(f"MySQL handler: unknown type: {mysql_type_text}, use TEXT as fallback.") 

37 return MYSQL_DATA_TYPE.TEXT 

38 

39 

40def _make_table_response(result: List[Dict[str, Any]], cursor: mysql.connector.cursor.MySQLCursor) -> Response: 

41 """Build response from result and cursor. 

42 

43 Args: 

44 result (list[dict]): result of the query. 

45 cursor (mysql.connector.cursor.MySQLCursor): cursor object. 

46 

47 Returns: 

48 Response: response object. 

49 """ 

50 description = cursor.description 

51 reverse_c_type_map = {v.code: k for k, v in DATA_C_TYPE_MAP.items() if v.code != C_TYPES.MYSQL_TYPE_BLOB} 

52 mysql_types: list[MYSQL_DATA_TYPE] = [] 

53 for col in description: 

54 type_int = col[1] 

55 if isinstance(type_int, int) is False: 

56 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

57 continue 

58 

59 if type_int == C_TYPES.MYSQL_TYPE_TINY: 

60 # There are 3 types that returns as TINYINT: TINYINT, BOOL, BOOLEAN. 

61 mysql_types.append(MYSQL_DATA_TYPE.TINYINT) 

62 continue 

63 

64 if type_int in reverse_c_type_map: 

65 mysql_types.append(reverse_c_type_map[type_int]) 

66 continue 

67 

68 if type_int == C_TYPES.MYSQL_TYPE_BLOB: 68 ↛ 83line 68 didn't jump to line 83 because the condition on line 68 was always true

69 # region determine text/blob type by flags 

70 # Unfortunately, there is no way to determine particular type of text/blob column by flags. 

71 # Subtype have to be determined by 8-s element of description tuple, but mysql.conector 

72 # return the same value for all text types (TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT), and for 

73 # all blob types (TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB). 

74 if col[7] == 16: # and col[8] == 45 

75 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

76 elif col[7] == 144: # and col[8] == 63 76 ↛ 79line 76 didn't jump to line 79 because the condition on line 76 was always true

77 mysql_types.append(MYSQL_DATA_TYPE.BLOB) 

78 else: 

79 logger.debug(f"MySQL handler: unknown type code {col[7]}, use TEXT as fallback.") 

80 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

81 # endregion 

82 else: 

83 logger.warning(f"MySQL handler: unknown type id={type_int} in column {col[0]}, use TEXT as fallback.") 

84 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

85 

86 # region cast int and bool to nullable types 

87 serieses = [] 

88 for i, mysql_type in enumerate(mysql_types): 

89 expected_dtype = None 

90 column_name = description[i][0] 

91 if mysql_type in ( 

92 MYSQL_DATA_TYPE.SMALLINT, 

93 MYSQL_DATA_TYPE.INT, 

94 MYSQL_DATA_TYPE.MEDIUMINT, 

95 MYSQL_DATA_TYPE.BIGINT, 

96 MYSQL_DATA_TYPE.TINYINT, 

97 ): 

98 expected_dtype = "Int64" 

99 elif mysql_type in (MYSQL_DATA_TYPE.BOOL, MYSQL_DATA_TYPE.BOOLEAN): 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 expected_dtype = "boolean" 

101 serieses.append(pd.Series([row[column_name] for row in result], dtype=expected_dtype, name=description[i][0])) 

102 df = pd.concat(serieses, axis=1, copy=False) 

103 # endregion 

104 

105 response = Response(RESPONSE_TYPE.TABLE, df, affected_rows=cursor.rowcount, mysql_types=mysql_types) 

106 return response 

107 

108 

109class MySQLHandler(MetaDatabaseHandler): 

110 """ 

111 This handler handles connection and execution of the MySQL statements. 

112 """ 

113 

114 name = "mysql" 

115 

116 def __init__(self, name: str, **kwargs: Any) -> None: 

117 super().__init__(name) 

118 self.parser = parse_sql 

119 self.dialect = "mysql" 

120 self.connection_data = kwargs.get("connection_data", {}) 

121 self.database = self.connection_data.get("database") 

122 

123 self.connection: Optional[mysql.connector.MySQLConnection] = None 

124 

125 def __del__(self) -> None: 

126 if self.is_connected: 

127 self.disconnect() 

128 

129 def _unpack_config(self) -> Dict[str, Any]: 

130 """ 

131 Unpacks the config from the connection_data by validation all parameters. 

132 

133 Returns: 

134 dict: A dictionary containing the validated connection parameters. 

135 """ 

136 try: 

137 config = ConnectionConfig(**self.connection_data) 

138 return config.model_dump(exclude_unset=True) 

139 except ValueError as e: 

140 raise ValueError(str(e)) 

141 

142 @property 

143 def is_connected(self) -> bool: 

144 """ 

145 Checks if the handler is connected to the MySQL database. 

146 

147 Returns: 

148 bool: True if the handler is connected, False otherwise. 

149 """ 

150 return self.connection is not None and self.connection.is_connected() 

151 

152 @is_connected.setter 

153 def is_connected(self, value: bool) -> None: 

154 pass 

155 

156 def connect(self) -> mysql.connector.MySQLConnection: 

157 """ 

158 Establishes a connection to a MySQL database. 

159 

160 Returns: 

161 MySQLConnection: An active connection to the database. 

162 """ 

163 if self.is_connected and self.connection.is_connected(): 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 return self.connection 

165 config = self._unpack_config() 

166 if "conn_attrs" in self.connection_data: 

167 config["conn_attrs"] = self.connection_data["conn_attrs"] 

168 

169 if "connection_timeout" not in config: 

170 config["connection_timeout"] = 10 

171 

172 ssl = self.connection_data.get("ssl") 

173 if ssl is True: 

174 ssl_ca = self.connection_data.get("ssl_ca") 

175 ssl_cert = self.connection_data.get("ssl_cert") 

176 ssl_key = self.connection_data.get("ssl_key") 

177 config["client_flags"] = [mysql.connector.constants.ClientFlag.SSL] 

178 if ssl_ca is not None: 178 ↛ 180line 178 didn't jump to line 180 because the condition on line 178 was always true

179 config["ssl_ca"] = ssl_ca 

180 if ssl_cert is not None: 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true

181 config["ssl_cert"] = ssl_cert 

182 if ssl_key is not None: 182 ↛ 187line 182 didn't jump to line 187 because the condition on line 182 was always true

183 config["ssl_key"] = ssl_key 

184 elif ssl is False: 

185 config["ssl_disabled"] = True 

186 

187 if "collation" not in config: 

188 config["collation"] = "utf8mb4_general_ci" 

189 if "use_pure" not in config: 

190 config["use_pure"] = True 

191 try: 

192 connection = mysql.connector.connect(**config) 

193 connection.autocommit = True 

194 self.connection = connection 

195 return self.connection 

196 except mysql.connector.Error as e: 

197 logger.error(f"Error connecting to MySQL {self.database}, {e}!") 

198 raise 

199 

200 def disconnect(self) -> None: 

201 """ 

202 Closes the connection to the MySQL database if it's currently open. 

203 """ 

204 if self.is_connected is False: 

205 return 

206 self.connection.close() 

207 return 

208 

209 def check_connection(self) -> StatusResponse: 

210 """ 

211 Checks the status of the connection to the MySQL database. 

212 

213 Returns: 

214 StatusResponse: An object containing the success status and an error message if an error occurs. 

215 """ 

216 

217 result = StatusResponse(False) 

218 need_to_close = not self.is_connected 

219 

220 try: 

221 connection = self.connect() 

222 result.success = connection.is_connected() 

223 except mysql.connector.Error as e: 

224 logger.error(f"Error connecting to MySQL {self.connection_data.get('database', 'unknown')}! Error: {e}") 

225 result.error_message = str(e) 

226 

227 if result.success and need_to_close: 

228 self.disconnect() 

229 

230 return result 

231 

232 def native_query(self, query: str) -> Response: 

233 """ 

234 Executes a SQL query on the MySQL database and returns the result. 

235 

236 Args: 

237 query (str): The SQL query to be executed. 

238 

239 Returns: 

240 Response: A response object containing the result of the query or an error message. 

241 """ 

242 need_to_close = not self.is_connected 

243 connection = None 

244 try: 

245 connection = self.connect() 

246 with connection.cursor(dictionary=True, buffered=True) as cur: 

247 cur.execute(query) 

248 if cur.with_rows: 

249 result = cur.fetchall() 

250 response = _make_table_response(result, cur) 

251 else: 

252 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount) 

253 except mysql.connector.Error as e: 

254 logger.error( 

255 f"Error running query: {query} on {self.connection_data.get('database', 'unknown')}! Error: {e}" 

256 ) 

257 response = Response(RESPONSE_TYPE.ERROR, error_code=e.errno or 1, error_message=str(e)) 

258 if connection is not None and connection.is_connected(): 258 ↛ 261line 258 didn't jump to line 261 because the condition on line 258 was always true

259 connection.rollback() 

260 

261 if need_to_close: 261 ↛ 264line 261 didn't jump to line 264 because the condition on line 261 was always true

262 self.disconnect() 

263 

264 return response 

265 

266 def query(self, query: ASTNode) -> Response: 

267 """ 

268 Retrieve the data from the SQL statement. 

269 """ 

270 renderer = SqlalchemyRender("mysql") 

271 query_str = renderer.get_string(query, with_failback=True) 

272 return self.native_query(query_str) 

273 

274 def get_tables(self) -> Response: 

275 """ 

276 Get a list with all of the tabels in MySQL selected database 

277 """ 

278 sql = """ 

279 SELECT 

280 TABLE_SCHEMA AS table_schema, 

281 TABLE_NAME AS table_name, 

282 TABLE_TYPE AS table_type 

283 FROM 

284 information_schema.TABLES 

285 WHERE 

286 TABLE_TYPE IN ('BASE TABLE', 'VIEW') 

287 AND TABLE_SCHEMA = DATABASE() 

288 ORDER BY 2 

289 ; 

290 """ 

291 result = self.native_query(sql) 

292 return result 

293 

294 def get_columns(self, table_name: str) -> Response: 

295 """ 

296 Show details about the table 

297 """ 

298 q = f""" 

299 select 

300 COLUMN_NAME, 

301 DATA_TYPE, 

302 ORDINAL_POSITION, 

303 COLUMN_DEFAULT, 

304 IS_NULLABLE, 

305 CHARACTER_MAXIMUM_LENGTH, 

306 CHARACTER_OCTET_LENGTH, 

307 NUMERIC_PRECISION, 

308 NUMERIC_SCALE, 

309 DATETIME_PRECISION, 

310 CHARACTER_SET_NAME, 

311 COLLATION_NAME 

312 from 

313 information_schema.columns 

314 where 

315 table_name = '{table_name}'; 

316 """ 

317 result = self.native_query(q) 

318 result.to_columns_table_response(map_type_fn=_map_type) 

319 return result 

320 

321 def meta_get_tables(self, table_names: Optional[List[str]] = None, include_row_count: bool = False) -> Response: 

322 """ 

323 Retrieves metadata information about the tables in the MySQL database 

324 to be stored in the data catalog. 

325 

326 Args: 

327 table_names (list): A list of table names for which to retrieve metadata information. 

328 include_row_count (bool): Include TABLE_ROWS statistics (can be expensive on large schemas). 

329 

330 Returns: 

331 Response: A response object containing the metadata information. 

332 """ 

333 row_count_select = """,\n t.TABLE_ROWS as row_count""" if include_row_count else "" 

334 

335 query = f""" 

336 SELECT  

337 t.TABLE_NAME as table_name, 

338 t.TABLE_SCHEMA as table_schema, 

339 t.TABLE_TYPE as table_type, 

340 t.TABLE_COMMENT as table_description 

341 {row_count_select} 

342 FROM information_schema.TABLES t 

343 WHERE t.TABLE_SCHEMA = DATABASE() 

344 AND t.TABLE_TYPE IN ('BASE TABLE', 'VIEW') 

345 """ 

346 

347 if table_names is not None and len(table_names) > 0: 

348 quoted_names = [f"'{t}'" for t in table_names] 

349 query += f" AND t.TABLE_NAME IN ({','.join(quoted_names)})" 

350 

351 query += " ORDER BY t.TABLE_NAME" 

352 

353 result = self.native_query(query) 

354 return result 

355 

356 def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response: 

357 """ 

358 Retrieves column metadata for the specified tables (or all tables if no list is provided). 

359 

360 Args: 

361 table_names (list): A list of table names for which to retrieve column metadata. 

362 

363 Returns: 

364 Response: A response object containing the column metadata. 

365 """ 

366 query = """ 

367 SELECT  

368 c.TABLE_NAME as table_name, 

369 c.COLUMN_NAME as column_name, 

370 c.DATA_TYPE as data_type, 

371 c.COLUMN_COMMENT as column_description, 

372 c.COLUMN_DEFAULT as column_default, 

373 CASE WHEN c.IS_NULLABLE = 'YES' THEN 1 ELSE 0 END as is_nullable 

374 FROM information_schema.COLUMNS c 

375 WHERE c.TABLE_SCHEMA = DATABASE() 

376 """ 

377 

378 if table_names is not None and len(table_names) > 0: 

379 quoted_names = [f"'{t}'" for t in table_names] 

380 query += f" AND c.TABLE_NAME IN ({','.join(quoted_names)})" 

381 

382 query += " ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION" 

383 

384 result = self.native_query(query) 

385 return result 

386 

387 def meta_get_column_statistics(self, table_names: Optional[List[str]] = None) -> Response: 

388 """ 

389 Retrieves column statistics for the specified tables (or all tables if no list is provided). 

390 Uses MySQL 8.0+ metadata sources (INFORMATION_SCHEMA.COLUMN_STATISTICS and INFORMATION_SCHEMA.STATISTICS) not requiring table scans. 

391 

392 Args: 

393 table_names (list): A list of table names for which to retrieve column statistics. 

394 

395 Returns: 

396 Response: A response object containing the column statistics. 

397 """ 

398 table_filter = "" 

399 if table_names: 

400 quoted = ",".join(f"'{t}'" for t in table_names) 

401 table_filter = f" AND c.TABLE_NAME IN ({quoted})" 

402 

403 query = f""" 

404 WITH cols AS ( 

405 SELECT c.TABLE_SCHEMA, c.TABLE_NAME, c.COLUMN_NAME, c.ORDINAL_POSITION 

406 FROM information_schema.COLUMNS c 

407 WHERE c.TABLE_SCHEMA = DATABASE() 

408 {table_filter} 

409 ), 

410 hist AS ( 

411 SELECT 

412 cs.SCHEMA_NAME AS TABLE_SCHEMA, 

413 cs.TABLE_NAME, 

414 cs.COLUMN_NAME, 

415 cs.HISTOGRAM, 

416 JSON_LENGTH(cs.HISTOGRAM, '$.buckets') AS buckets_len 

417 FROM information_schema.COLUMN_STATISTICS cs 

418 WHERE cs.SCHEMA_NAME = DATABASE() 

419 ), 

420 ndv AS ( 

421 SELECT 

422 s.TABLE_SCHEMA, 

423 s.TABLE_NAME, 

424 s.COLUMN_NAME, 

425 MAX(s.CARDINALITY) AS DISTINCT_VALUES_COUNT 

426 FROM information_schema.STATISTICS s 

427 WHERE s.TABLE_SCHEMA = DATABASE() 

428 GROUP BY s.TABLE_SCHEMA, s.TABLE_NAME, s.COLUMN_NAME 

429 ) 

430 SELECT 

431 c.TABLE_NAME AS TABLE_NAME, 

432 c.COLUMN_NAME AS COLUMN_NAME, 

433 

434 /* optional fields kept NULL for simplicity */ 

435 CAST(NULL AS JSON) AS MOST_COMMON_VALUES, 

436 CAST(NULL AS JSON) AS MOST_COMMON_FREQUENCIES, 

437 

438 /* histogram "null-values" fraction -> percent */ 

439 CASE 

440 WHEN h.HISTOGRAM IS NULL THEN NULL 

441 ELSE ROUND( 

442 CAST(JSON_UNQUOTE(JSON_EXTRACT(h.HISTOGRAM, '$."null-values"')) AS DECIMAL(10,6)) * 100, 

443 2 

444 ) 

445 END AS NULL_PERCENTAGE, 

446 /* MIN: first bucket's point (singleton) or lower endpoint (equi-height) */ 

447 CASE 

448 WHEN h.HISTOGRAM IS NULL THEN NULL 

449 ELSE COALESCE( 

450 JSON_UNQUOTE(JSON_EXTRACT(h.HISTOGRAM, '$.buckets[0].value')), 

451 JSON_UNQUOTE(JSON_EXTRACT(h.HISTOGRAM, '$.buckets[0].endpoint[0]')) 

452 ) 

453 END AS MINIMUM_VALUE, 

454 

455 /* MAX: last bucket's point (singleton) or upper endpoint (equi-height) */ 

456 CASE 

457 WHEN h.HISTOGRAM IS NULL THEN NULL 

458 ELSE COALESCE( 

459 JSON_UNQUOTE( 

460 JSON_EXTRACT(h.HISTOGRAM, 

461 CONCAT('$.buckets[', GREATEST(h.buckets_len - 1, 0), '].value') 

462 ) 

463 ), 

464 JSON_UNQUOTE( 

465 JSON_EXTRACT(h.HISTOGRAM, 

466 CONCAT('$.buckets[', GREATEST(h.buckets_len - 1, 0), '].endpoint[1]') 

467 ) 

468 ), 

469 JSON_UNQUOTE( 

470 JSON_EXTRACT(h.HISTOGRAM, 

471 CONCAT('$.buckets[', GREATEST(h.buckets_len - 1, 0), '].endpoint[0]') 

472 ) 

473 ) 

474 ) 

475 END AS MAXIMUM_VALUE, 

476 n.DISTINCT_VALUES_COUNT 

477 FROM cols c 

478 LEFT JOIN hist h 

479 ON h.TABLE_SCHEMA = c.TABLE_SCHEMA 

480 AND h.TABLE_NAME = c.TABLE_NAME 

481 AND h.COLUMN_NAME = c.COLUMN_NAME 

482 LEFT JOIN ndv n 

483 ON n.TABLE_SCHEMA = c.TABLE_SCHEMA 

484 AND n.TABLE_NAME = c.TABLE_NAME 

485 AND n.COLUMN_NAME = c.COLUMN_NAME 

486 ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION; 

487 """ 

488 return self.native_query(query) 

489 

490 def meta_get_primary_keys(self, table_names: Optional[List[str]] = None) -> Response: 

491 """ 

492 Retrieves primary key information for the specified tables (or all tables if no list is provided). 

493 

494 Args: 

495 table_names (list): A list of table names for which to retrieve primary key information. 

496 

497 Returns: 

498 Response: A response object containing the primary key information. 

499 """ 

500 query = """ 

501 SELECT  

502 tc.TABLE_NAME as table_name, 

503 kcu.COLUMN_NAME as column_name, 

504 kcu.ORDINAL_POSITION as ordinal_position, 

505 tc.CONSTRAINT_NAME as constraint_name 

506 FROM information_schema.TABLE_CONSTRAINTS tc 

507 INNER JOIN information_schema.KEY_COLUMN_USAGE kcu 

508 ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME 

509 AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA 

510 AND tc.TABLE_NAME = kcu.TABLE_NAME 

511 WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY' 

512 AND tc.TABLE_SCHEMA = DATABASE() 

513 """ 

514 

515 if table_names is not None and len(table_names) > 0: 

516 quoted_names = [f"'{t}'" for t in table_names] 

517 query += f" AND tc.TABLE_NAME IN ({','.join(quoted_names)})" 

518 

519 query += " ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION" 

520 

521 result = self.native_query(query) 

522 return result 

523 

524 def meta_get_foreign_keys(self, table_names: Optional[List[str]] = None) -> Response: 

525 """ 

526 Retrieves foreign key information for the specified tables (or all tables if no list is provided). 

527 

528 Args: 

529 table_names (list): A list of table names for which to retrieve foreign key information. 

530 

531 Returns: 

532 Response: A response object containing the foreign key information. 

533 """ 

534 query = """ 

535 SELECT  

536 kcu.REFERENCED_TABLE_NAME as parent_table_name, 

537 kcu.REFERENCED_COLUMN_NAME as parent_column_name, 

538 kcu.TABLE_NAME as child_table_name, 

539 kcu.COLUMN_NAME as child_column_name, 

540 kcu.CONSTRAINT_NAME as constraint_name 

541 FROM information_schema.KEY_COLUMN_USAGE kcu 

542 WHERE kcu.TABLE_SCHEMA = DATABASE() 

543 AND kcu.REFERENCED_TABLE_NAME IS NOT NULL 

544 """ 

545 

546 if table_names is not None and len(table_names) > 0: 

547 quoted_names = [f"'{t}'" for t in table_names] 

548 query += f" AND kcu.TABLE_NAME IN ({','.join(quoted_names)})" 

549 

550 query += " ORDER BY kcu.TABLE_NAME, kcu.CONSTRAINT_NAME" 

551 

552 result = self.native_query(query) 

553 return result