Coverage for mindsdb / api / executor / datahub / datanodes / system_tables.py: 26%
354 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional, Literal
2from dataclasses import dataclass, fields
4import pandas as pd
5from mindsdb_sql_parser.ast.base import ASTNode
7from mindsdb.utilities import log
8from mindsdb.utilities.config import config
9from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions
10from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES
11from mindsdb.interfaces.data_catalog.data_catalog_retriever import DataCatalogRetriever
12from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE, MYSQL_DATA_TYPE_COLUMNS_DEFAULT
13from mindsdb.api.executor.datahub.classes.tables_row import TABLES_ROW_TYPE, TablesRow
16logger = log.getLogger(__name__)
19def _get_scope(query):
20 databases, tables = None, None
21 try:
22 conditions = extract_comparison_conditions(query.where, ignore_functions=True)
23 except NotImplementedError:
24 return databases, tables
25 for op, arg1, arg2 in conditions:
26 if op == "=":
27 scope = [arg2]
28 elif op == "in":
29 if not isinstance(arg2, list):
30 arg2 = [arg2]
31 scope = arg2
32 else:
33 continue
35 if arg1.lower() == "table_schema":
36 databases = scope
37 elif arg1.lower() == "table_name":
38 tables = scope
39 return databases, tables
42class Table:
43 deletable: bool = False
44 visible: bool = False
45 kind: str = "table"
48class SchemataTable(Table):
49 name = "SCHEMATA"
50 columns = [
51 "CATALOG_NAME",
52 "SCHEMA_NAME",
53 "DEFAULT_CHARACTER_SET_NAME",
54 "DEFAULT_COLLATION_NAME",
55 "SQL_PATH",
56 ]
58 @classmethod
59 def get_data(cls, inf_schema=None, **kwargs):
60 databases_meta = inf_schema.session.database_controller.get_list()
61 data = [["def", x["name"], "utf8mb4", "utf8mb4_0900_ai_ci", None] for x in databases_meta]
63 df = pd.DataFrame(data, columns=cls.columns)
64 return df
67class TablesTable(Table):
68 name = "TABLES"
70 columns = [
71 "TABLE_CATALOG",
72 "TABLE_SCHEMA",
73 "TABLE_NAME",
74 "TABLE_TYPE",
75 "ENGINE",
76 "VERSION",
77 "ROW_FORMAT",
78 "TABLE_ROWS",
79 "AVG_ROW_LENGTH",
80 "DATA_LENGTH",
81 "MAX_DATA_LENGTH",
82 "INDEX_LENGTH",
83 "DATA_FREE",
84 "AUTO_INCREMENT",
85 "CREATE_TIME",
86 "UPDATE_TIME",
87 "CHECK_TIME",
88 "TABLE_COLLATION",
89 "CHECKSUM",
90 "CREATE_OPTIONS",
91 "TABLE_COMMENT",
92 ]
94 @classmethod
95 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs):
96 databases, _ = _get_scope(query)
98 data = []
99 for name in inf_schema.tables.keys():
100 if databases is not None and name not in databases:
101 continue
102 row = TablesRow(TABLE_TYPE=TABLES_ROW_TYPE.SYSTEM_VIEW, TABLE_NAME=name)
103 data.append(row.to_list())
105 for ds_name in inf_schema.persist_datanodes_names:
106 if databases is not None and ds_name not in databases:
107 continue
108 ds = inf_schema.get(ds_name)
110 if hasattr(ds, "get_tables_rows"):
111 ds_tables = ds.get_tables_rows()
112 else:
113 ds_tables = ds.get_tables()
114 if len(ds_tables) == 0:
115 continue
116 elif isinstance(ds_tables[0], dict):
117 ds_tables = [TablesRow(TABLE_TYPE=TABLES_ROW_TYPE.BASE_TABLE, TABLE_NAME=x["name"]) for x in ds_tables]
118 elif isinstance(ds_tables, list) and len(ds_tables) > 0 and isinstance(ds_tables[0], str):
119 ds_tables = [TablesRow(TABLE_TYPE=TABLES_ROW_TYPE.BASE_TABLE, TABLE_NAME=x) for x in ds_tables]
120 for row in ds_tables:
121 row.TABLE_SCHEMA = ds_name
122 data.append(row.to_list())
124 for ds_name in inf_schema.get_integrations_names():
125 if databases is not None and ds_name not in databases:
126 continue
128 try:
129 ds = inf_schema.get(ds_name)
130 ds_tables = ds.get_tables()
131 for row in ds_tables:
132 row.TABLE_SCHEMA = ds_name
133 data.append(row.to_list())
134 except Exception:
135 logger.exception(f"Can't get tables from '{ds_name}'")
137 for project_name in inf_schema.get_projects_names():
138 if databases is not None and project_name not in databases:
139 continue
141 project_dn = inf_schema.get(project_name)
142 project_tables = project_dn.get_tables()
143 for row in project_tables:
144 row.TABLE_SCHEMA = project_name
145 data.append(row.to_list())
147 df = pd.DataFrame(data, columns=cls.columns)
148 return df
151def infer_mysql_type(original_type: str) -> MYSQL_DATA_TYPE:
152 """Infer MySQL data type from original type string from a database.
154 Args:
155 original_type (str): The original type string from a database.
157 Returns:
158 MYSQL_DATA_TYPE: The inferred MySQL data type.
159 """
160 match original_type.lower():
161 case "double precision" | "real" | "numeric" | "float":
162 data_type = MYSQL_DATA_TYPE.FLOAT
163 case "integer" | "smallint" | "int" | "bigint":
164 data_type = MYSQL_DATA_TYPE.BIGINT
165 case "timestamp without time zone" | "timestamp with time zone" | "date" | "timestamp":
166 data_type = MYSQL_DATA_TYPE.DATETIME
167 case _:
168 data_type = MYSQL_DATA_TYPE.VARCHAR
169 return data_type
172@dataclass(slots=True, kw_only=True)
173class ColumnsTableRow:
174 """Represents a row in the MindsDB's internal INFORMATION_SCHEMA.COLUMNS table.
175 This class follows the MySQL-compatible COLUMNS table structure.
177 Detailed field descriptions can be found in MySQL documentation:
178 https://dev.mysql.com/doc/refman/8.4/en/information-schema-columns-table.html
180 NOTE: The order of attributes is significant and matches the MySQL column order.
181 """
183 TABLE_CATALOG: Literal["def"] = "def"
184 TABLE_SCHEMA: Optional[str] = None
185 TABLE_NAME: Optional[str] = None
186 COLUMN_NAME: Optional[str] = None
187 ORDINAL_POSITION: int = 0
188 COLUMN_DEFAULT: Optional[str] = None
189 IS_NULLABLE: Literal["YES", "NO"] = "YES"
190 DATA_TYPE: str = MYSQL_DATA_TYPE.VARCHAR.value
191 CHARACTER_MAXIMUM_LENGTH: Optional[int] = None
192 CHARACTER_OCTET_LENGTH: Optional[int] = None
193 NUMERIC_PRECISION: Optional[int] = None
194 NUMERIC_SCALE: Optional[int] = None
195 DATETIME_PRECISION: Optional[int] = None
196 CHARACTER_SET_NAME: Optional[str] = None
197 COLLATION_NAME: Optional[str] = None
198 COLUMN_TYPE: Optional[str] = None
199 COLUMN_KEY: Optional[str] = None
200 EXTRA: Optional[str] = None
201 PRIVILEGES: str = "select"
202 COLUMN_COMMENT: Optional[str] = None
203 GENERATION_EXPRESSION: Optional[str] = None
204 SRS_ID: Optional[str] = None
205 # MindsDB's specific columns:
206 ORIGINAL_TYPE: Optional[str] = None
208 @classmethod
209 def from_is_columns_row(cls, table_schema: str, table_name: str, row: pd.Series) -> "ColumnsTableRow":
210 """Transform row from response of `handler.get_columns(...)` to internal information_schema.columns row.
212 Args:
213 table_schema (str): The name of the schema of the table which columns are described.
214 table_name (str): The name of the table which columns are described.
215 row (pd.Series): A row from the response of `handler.get_columns(...)`.
217 Returns:
218 ColumnsTableRow: A row in the MindsDB's internal INFORMATION_SCHEMA.COLUMNS table.
219 """
220 original_type: str = row[INF_SCHEMA_COLUMNS_NAMES.DATA_TYPE] or ""
221 data_type: MYSQL_DATA_TYPE | None = row[INF_SCHEMA_COLUMNS_NAMES.MYSQL_DATA_TYPE]
222 if isinstance(data_type, MYSQL_DATA_TYPE) is False:
223 data_type = infer_mysql_type(original_type)
225 # region set default values depend on type
226 defaults = MYSQL_DATA_TYPE_COLUMNS_DEFAULT.get(data_type)
227 if defaults is not None:
228 for key, value in defaults.items():
229 if key in row and row[key] is None:
230 row[key] = value
232 # region determine COLUMN_TYPE - it is text representation of DATA_TYPE with additioan attributes
233 match data_type:
234 case MYSQL_DATA_TYPE.DECIMAL:
235 column_type = f"decimal({row[INF_SCHEMA_COLUMNS_NAMES.NUMERIC_PRECISION]},{INF_SCHEMA_COLUMNS_NAMES.NUMERIC_SCALE})"
236 case MYSQL_DATA_TYPE.VARCHAR:
237 column_type = f"varchar({row[INF_SCHEMA_COLUMNS_NAMES.CHARACTER_MAXIMUM_LENGTH]})"
238 case MYSQL_DATA_TYPE.VARBINARY:
239 column_type = f"varbinary({row[INF_SCHEMA_COLUMNS_NAMES.CHARACTER_MAXIMUM_LENGTH]})"
240 case MYSQL_DATA_TYPE.BIT | MYSQL_DATA_TYPE.BINARY | MYSQL_DATA_TYPE.CHAR:
241 column_type = f"{data_type.value.lower()}(1)"
242 case MYSQL_DATA_TYPE.BOOL | MYSQL_DATA_TYPE.BOOLEAN:
243 column_type = "tinyint(1)"
244 case _:
245 column_type = data_type.value.lower()
246 # endregion
248 # BOOLean types had 'tinyint' DATA_TYPE in MySQL
249 if data_type in (MYSQL_DATA_TYPE.BOOL, MYSQL_DATA_TYPE.BOOLEAN):
250 data_type = "tinyint"
251 else:
252 data_type = data_type.value.lower()
254 return cls(
255 TABLE_SCHEMA=table_schema,
256 TABLE_NAME=table_name,
257 COLUMN_NAME=row[INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME],
258 ORDINAL_POSITION=row[INF_SCHEMA_COLUMNS_NAMES.ORDINAL_POSITION],
259 COLUMN_DEFAULT=row[INF_SCHEMA_COLUMNS_NAMES.COLUMN_DEFAULT],
260 IS_NULLABLE=row[INF_SCHEMA_COLUMNS_NAMES.IS_NULLABLE],
261 DATA_TYPE=data_type,
262 CHARACTER_MAXIMUM_LENGTH=row[INF_SCHEMA_COLUMNS_NAMES.CHARACTER_MAXIMUM_LENGTH],
263 CHARACTER_OCTET_LENGTH=row[INF_SCHEMA_COLUMNS_NAMES.CHARACTER_OCTET_LENGTH],
264 NUMERIC_PRECISION=row[INF_SCHEMA_COLUMNS_NAMES.NUMERIC_PRECISION],
265 NUMERIC_SCALE=row[INF_SCHEMA_COLUMNS_NAMES.NUMERIC_SCALE],
266 DATETIME_PRECISION=row[INF_SCHEMA_COLUMNS_NAMES.DATETIME_PRECISION],
267 CHARACTER_SET_NAME=row[INF_SCHEMA_COLUMNS_NAMES.CHARACTER_SET_NAME],
268 COLLATION_NAME=row[INF_SCHEMA_COLUMNS_NAMES.COLLATION_NAME],
269 COLUMN_TYPE=column_type,
270 ORIGINAL_TYPE=original_type,
271 )
273 def __post_init__(self):
274 """Check if all mandatory fields are filled."""
275 mandatory_fields = ["TABLE_SCHEMA", "TABLE_NAME", "COLUMN_NAME"]
276 if any(getattr(self, field_name) is None for field_name in mandatory_fields):
277 raise ValueError("One of mandatory fields is missed when creating ColumnsTableRow")
280class ColumnsTable(Table):
281 name = "COLUMNS"
282 columns = [field.name for field in fields(ColumnsTableRow)]
284 @classmethod
285 def get_data(cls, inf_schema=None, query: ASTNode = None, **kwargs) -> pd.DataFrame:
286 databases, tables_names = _get_scope(query)
288 if databases is None:
289 databases = ["information_schema", config.get("default_project"), "files"]
291 result = []
292 for db_name in databases:
293 tables = {}
295 dn = inf_schema.get(db_name)
296 if dn is None:
297 continue
299 if tables_names is None:
300 list_tables = [t.TABLE_NAME for t in dn.get_tables()]
301 else:
302 list_tables = tables_names
303 for table_name in list_tables:
304 tables[table_name] = dn.get_table_columns_df(table_name)
306 for table_name, table_columns_df in tables.items():
307 for _, row in table_columns_df.iterrows():
308 result.append(
309 ColumnsTableRow.from_is_columns_row(table_schema=db_name, table_name=table_name, row=row)
310 )
312 return pd.DataFrame(result, columns=cls.columns)
315class EventsTable(Table):
316 name = "EVENTS"
318 columns = [
319 "EVENT_CATALOG",
320 "EVENT_SCHEMA",
321 "EVENT_NAME",
322 "DEFINER",
323 "TIME_ZONE",
324 "EVENT_BODY",
325 "EVENT_DEFINITION",
326 "EVENT_TYPE",
327 "EXECUTE_AT",
328 "INTERVAL_VALUE",
329 "INTERVAL_FIELD",
330 "SQL_MODE",
331 "STARTS",
332 "ENDS",
333 "STATUS",
334 "ON_COMPLETION",
335 "CREATED",
336 "LAST_ALTERED",
337 "LAST_EXECUTED",
338 "EVENT_COMMENT",
339 "ORIGINATOR",
340 "CHARACTER_SET_CLIENT",
341 "COLLATION_CONNECTION",
342 "DATABASE_COLLATION",
343 ]
346class RoutinesTable(Table):
347 name = "ROUTINE"
348 columns = [
349 "SPECIFIC_NAME",
350 "ROUTINE_CATALOG",
351 "ROUTINE_SCHEMA",
352 "ROUTINE_NAME",
353 "ROUTINE_TYPE",
354 "DATA_TYPE",
355 "CHARACTER_MAXIMUM_LENGTH",
356 "CHARACTER_OCTET_LENGTH",
357 "NUMERIC_PRECISION",
358 "NUMERIC_SCALE",
359 "DATETIME_PRECISION",
360 "CHARACTER_SET_NAME",
361 "COLLATION_NAME",
362 "DTD_IDENTIFIER",
363 "ROUTINE_BODY",
364 "ROUTINE_DEFINITION",
365 "EXTERNAL_NAME",
366 "EXTERNAL_LANGUAGE",
367 "PARAMETER_STYLE",
368 "IS_DETERMINISTIC",
369 "SQL_DATA_ACCESS",
370 "SQL_PATH",
371 "SECURITY_TYPE",
372 "CREATED",
373 "LAST_ALTERED",
374 "SQL_MODE",
375 "ROUTINE_COMMENT",
376 "DEFINER",
377 "CHARACTER_SET_CLIENT",
378 "COLLATION_CONNECTION",
379 "DATABASE_COLLATION",
380 ]
383class PluginsTable(Table):
384 name = "PLUGINS"
385 columns = [
386 "PLUGIN_NAME",
387 "PLUGIN_VERSION",
388 "PLUGIN_STATUS",
389 "PLUGIN_TYPE",
390 "PLUGIN_TYPE_VERSION",
391 "PLUGIN_LIBRARY",
392 "PLUGIN_LIBRARY_VERSION",
393 "PLUGIN_AUTHOR",
394 "PLUGIN_DESCRIPTION",
395 "PLUGIN_LICENSE",
396 "LOAD_OPTION",
397 "PLUGIN_MATURITY",
398 "PLUGIN_AUTH_VERSION",
399 ]
402class EnginesTable(Table):
403 name = "ENGINES"
404 columns = ["ENGINE", "SUPPORT", "COMMENT", "TRANSACTIONS", "XA", "SAVEPOINTS"]
406 @classmethod
407 def get_data(cls, **kwargs):
408 data = [
409 [
410 "InnoDB",
411 "DEFAULT",
412 "Supports transactions, row-level locking, and foreign keys",
413 "YES",
414 "YES",
415 "YES",
416 ]
417 ]
419 df = pd.DataFrame(data, columns=cls.columns)
420 return df
423class KeyColumnUsageTable(Table):
424 name = "KEY_COLUMN_USAGE"
425 columns = [
426 "CONSTRAINT_CATALOG",
427 "CONSTRAINT_SCHEMA",
428 "CONSTRAINT_NAME",
429 "TABLE_CATALOG",
430 "TABLE_SCHEMA",
431 "TABLE_NAME",
432 "COLUMN_NAME",
433 "ORDINAL_POSITION",
434 "POSITION_IN_UNIQUE_CONSTRAINT",
435 "REFERENCED_TABLE_SCHEMA",
436 "REFERENCED_TABLE_NAME",
437 "REFERENCED_COLUMN_NAME",
438 ]
441class StatisticsTable(Table):
442 name = "STATISTICS"
443 columns = [
444 "TABLE_CATALOG",
445 "TABLE_SCHEMA",
446 "TABLE_NAME",
447 "NON_UNIQUE",
448 "INDEX_SCHEMA",
449 "INDEX_NAME",
450 "SEQ_IN_INDEX",
451 "COLUMN_NAME",
452 "COLLATION",
453 "CARDINALITY",
454 "SUB_PART",
455 "PACKED",
456 "NULLABLE",
457 "INDEX_TYPE",
458 "COMMENT",
459 "INDEX_COMMENT",
460 "IS_VISIBLE",
461 "EXPRESSION",
462 ]
465class CharacterSetsTable(Table):
466 name = "CHARACTER_SETS"
467 columns = [
468 "CHARACTER_SET_NAME",
469 "DEFAULT_COLLATE_NAME",
470 "DESCRIPTION",
471 "MAXLEN",
472 ]
474 @classmethod
475 def get_data(cls, **kwargs):
476 data = [
477 ["utf8", "UTF-8 Unicode", "utf8_general_ci", 3],
478 ["latin1", "cp1252 West European", "latin1_swedish_ci", 1],
479 ["utf8mb4", "UTF-8 Unicode", "utf8mb4_general_ci", 4],
480 ]
482 df = pd.DataFrame(data, columns=cls.columns)
483 return df
486class CollationsTable(Table):
487 name = "COLLATIONS"
489 columns = [
490 "COLLATION_NAME",
491 "CHARACTER_SET_NAME",
492 "ID",
493 "IS_DEFAULT",
494 "IS_COMPILED",
495 "SORTLEN",
496 "PAD_ATTRIBUTE",
497 ]
499 @classmethod
500 def get_data(cls, **kwargs):
501 data = [
502 ["utf8_general_ci", "utf8", 33, "Yes", "Yes", 1, "PAD SPACE"],
503 ["latin1_swedish_ci", "latin1", 8, "Yes", "Yes", 1, "PAD SPACE"],
504 ]
506 df = pd.DataFrame(data, columns=cls.columns)
507 return df
510# Data Catalog tables
511# TODO: Should these be placed in a separate schema?
514# TODO: Combine with existing 'TablesTable'?
515class MetaTablesTable(Table):
516 name = "META_TABLES"
518 columns = ["TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "TABLE_TYPE", "TABLE_DESCRIPTION", "ROW_COUNT"]
520 @classmethod
521 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs):
522 databases, tables = _get_scope(query)
524 if not databases:
525 raise ValueError("At least one database must be specified in the query.")
527 df = pd.DataFrame()
528 for database in databases:
529 data_catalog_retriever = DataCatalogRetriever(database_name=database, table_names=tables)
530 table_df = data_catalog_retriever.retrieve_tables()
531 # Table schema may be returned as a column name.
532 table_df.columns = table_df.columns.str.upper()
533 table_df["TABLE_CATALOG"] = "def"
534 table_df["TABLE_SCHEMA"] = database
535 df = pd.concat([df, table_df])
537 df = df.reindex(columns=cls.columns, fill_value=None)
539 return df
542# TODO: Combine with existing 'ColumnsTable'?
543class MetaColumnsTable(Table):
544 name = "META_COLUMNS"
546 columns = [
547 "TABLE_CATALOG",
548 "TABLE_SCHEMA",
549 "TABLE_NAME",
550 "COLUMN_NAME",
551 "DATA_TYPE",
552 "COLUMN_DESCRIPTION",
553 "COLUMN_DEFAULT",
554 "IS_NULLABLE",
555 ]
557 @classmethod
558 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs):
559 databases, tables = _get_scope(query)
561 if not databases:
562 raise ValueError("At least one database must be specified in the query.")
564 df = pd.DataFrame()
565 for database in databases:
566 data_catalog_retriever = DataCatalogRetriever(database_name=database, table_names=tables)
567 columns_df = data_catalog_retriever.retrieve_columns()
568 columns_df["TABLE_CATALOG"] = "def"
569 columns_df["TABLE_SCHEMA"] = database
570 df = pd.concat([df, columns_df])
572 df.columns = df.columns.str.upper()
574 df = df.reindex(columns=cls.columns, fill_value=None)
575 df["IS_NULLABLE"] = df["IS_NULLABLE"].map({True: "YES", False: "NO"})
577 return df
580class MetaColumnStatisticsTable(Table):
581 name = "META_COLUMN_STATISTICS"
582 columns = [
583 "TABLE_SCHEMA",
584 "TABLE_NAME",
585 "COLUMN_NAME",
586 "MOST_COMMON_VALS",
587 "MOST_COMMON_FREQS",
588 "NULL_FRAC",
589 "N_DISTINCT",
590 "MIN_VALUE",
591 "MAX_VALUE",
592 ]
594 @classmethod
595 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs):
596 databases, tables = _get_scope(query)
598 if not databases:
599 raise ValueError("At least one database must be specified in the query.")
601 df = pd.DataFrame()
602 for database in databases:
603 data_catalog_retriever = DataCatalogRetriever(database_name=database, table_names=tables)
604 columns_df = data_catalog_retriever.retrieve_column_statistics()
605 columns_df["TABLE_CATALOG"] = "def"
606 columns_df["TABLE_SCHEMA"] = database
607 df = pd.concat([df, columns_df])
609 df.columns = df.columns.str.upper()
611 df.rename(
612 columns={
613 "NULL_PERCENTAGE": "NULL_FRAC",
614 "MOST_COMMON_VALUES": "MOST_COMMON_VALS",
615 "MOST_COMMON_FREQUENCIES": "MOST_COMMON_FREQS",
616 "DISTINCT_VALUES_COUNT": "N_DISTINCT",
617 "MINIMUM_VALUE": "MIN_VALUE",
618 "MAXIMUM_VALUE": "MAX_VALUE",
619 },
620 inplace=True,
621 )
623 df = df.reindex(columns=cls.columns, fill_value=None)
624 return df
627class MetaTableConstraintsTable(Table):
628 name = "META_TABLE_CONSTRAINTS"
629 columns = [
630 "CONSTRAINT_CATALOG",
631 "CONSTRAINT_SCHEMA",
632 "CONSTRAINT_NAME",
633 "TABLE_SCHEMA",
634 "TABLE_NAME",
635 "CONSTRAINT_TYPE",
636 "ENFORCED",
637 ]
639 @classmethod
640 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs):
641 databases, tables = _get_scope(query)
643 if not databases:
644 raise ValueError("At least one database must be specified in the query.")
646 df = pd.DataFrame()
647 for database in databases:
648 data_catalog_retriever = DataCatalogRetriever(database_name=database, table_names=tables)
650 primary_keys_df = data_catalog_retriever.retrieve_primary_keys()
651 if not primary_keys_df.empty:
652 primary_keys_df["CONSTRAINT_CATALOG"] = "def"
653 primary_keys_df[["CONSTRAINT_SCHEMA", "TABLE_SCHEMA"]] = database
654 primary_keys_df["CONSTRAINT_TYPE"] = "PRIMARY KEY"
656 primary_keys_df.columns = primary_keys_df.columns.str.upper()
658 df = pd.concat([df, primary_keys_df])
660 foreign_keys_df = data_catalog_retriever.retrieve_foreign_keys()
661 if not foreign_keys_df.empty:
662 foreign_keys_df["CONSTRAINT_CATALOG"] = "def"
663 foreign_keys_df[["CONSTRAINT_SCHEMA", "TABLE_SCHEMA"]] = database
664 foreign_keys_df["CONSTRAINT_TYPE"] = "FOREIGN KEY"
666 foreign_keys_df.columns = foreign_keys_df.columns.str.upper()
668 parent_constraints_df = foreign_keys_df.copy(deep=True)
669 child_constraints_df = foreign_keys_df.copy(deep=True)
671 parent_constraints_df.rename(
672 columns={
673 "PARENT_TABLE_NAME": "TABLE_NAME",
674 },
675 inplace=True,
676 )
677 child_constraints_df.rename(
678 columns={
679 "CHILD_TABLE_NAME": "TABLE_NAME",
680 },
681 inplace=True,
682 )
684 df = pd.concat([df, parent_constraints_df, child_constraints_df])
686 df = df.reindex(columns=cls.columns, fill_value=None)
688 return df
691class MetaColumnUsageTable(Table):
692 name = "META_KEY_COLUMN_USAGE"
693 columns = [
694 "CONSTRAINT_CATALOG",
695 "CONSTRAINT_SCHEMA",
696 "CONSTRAINT_NAME",
697 "TABLE_CATALOG",
698 "TABLE_SCHEMA",
699 "TABLE_NAME",
700 "COLUMN_NAME",
701 "ORDINAL_POSITION",
702 "POSITION_IN_UNIQUE_CONSTRAINT",
703 "REFERENCED_TABLE_SCHEMA",
704 "REFERENCED_TABLE_NAME",
705 "REFERENCED_COLUMN_NAME",
706 ]
708 @classmethod
709 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs):
710 databases, tables = _get_scope(query)
712 if not databases:
713 raise ValueError("At least one database must be specified in the query.")
715 df = pd.DataFrame()
716 for database in databases:
717 data_catalog_retriever = DataCatalogRetriever(database_name=database, table_names=tables)
719 primary_keys_df = data_catalog_retriever.retrieve_primary_keys()
720 if not primary_keys_df.empty:
721 primary_keys_df[["CONSTRAINT_CATALOG", "TABLE_CATALOG"]] = "def"
722 primary_keys_df[["CONSTRAINT_SCHEMA", "TABLE_SCHEMA"]] = database
724 primary_keys_df.columns = primary_keys_df.columns.str.upper()
726 df = pd.concat([df, primary_keys_df])
728 foreign_keys_df = data_catalog_retriever.retrieve_foreign_keys()
729 if not foreign_keys_df.empty:
730 foreign_keys_df[["CONSTRAINT_CATALOG", "TABLE_CATALOG"]] = "def"
731 foreign_keys_df[["TABLE_SCHEMA", "REFERENCED_TABLE_SCHEMA"]] = database
733 foreign_keys_df.columns = foreign_keys_df.columns.str.upper()
735 parent_constraints_df = foreign_keys_df.copy(deep=True)
736 child_constraints_df = foreign_keys_df.copy(deep=True)
738 parent_constraints_df.rename(
739 columns={
740 "PARENT_TABLE_NAME": "TABLE_NAME",
741 "PARENT_COLUMN_NAME": "COLUMN_NAME",
742 "CHILD_TABLE_NAME": "REFERENCED_TABLE_NAME",
743 "CHILD_COLUMN_NAME": "REFERENCED_COLUMN_NAME",
744 },
745 inplace=True,
746 )
747 child_constraints_df.rename(
748 columns={
749 "CHILD_TABLE_NAME": "TABLE_NAME",
750 "CHILD_COLUMN_NAME": "COLUMN_NAME",
751 "PARENT_TABLE_NAME": "REFERENCED_TABLE_NAME",
752 "PARENT_COLUMN_NAME": "REFERENCED_COLUMN_NAME",
753 },
754 inplace=True,
755 )
757 df = pd.concat([df, parent_constraints_df, child_constraints_df])
759 df = df.reindex(columns=cls.columns, fill_value=None)
761 return df
764class MetaHandlerInfoTable(Table):
765 name = "META_HANDLER_INFO"
766 columns = ["HANDLER_INFO", "TABLE_SCHEMA"]
768 @classmethod
769 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs):
770 databases, tables = _get_scope(query)
772 if not databases:
773 raise ValueError("At least one database must be specified in the query.")
775 data = []
776 for database in databases:
777 data_catalog_retriever = DataCatalogRetriever(database_name=database, table_names=tables)
778 handler_info = data_catalog_retriever.retrieve_handler_info()
779 data.append({"HANDLER_INFO": str(handler_info) if handler_info else None, "TABLE_SCHEMA": database})
781 df = pd.DataFrame(data, columns=cls.columns)
782 return df