Coverage for mindsdb / interfaces / data_catalog / data_catalog_retriever.py: 8%
213 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import ast
2import pandas as pd
3from typing import List, Optional, Union
5from mindsdb.integrations.libs.api_handler import MetaAPIHandler
6from mindsdb.integrations.libs.base import MetaDatabaseHandler
7from mindsdb.integrations.libs.response import RESPONSE_TYPE
8from mindsdb.utilities import log
11logger = log.getLogger("mindsdb")
14class DataCatalogRetriever:
15 """
16 This class is responsible for retrieving (data catalog) metadata directly from the data source via the handler.
17 """
19 def __init__(self, database_name: str, table_names: Optional[List[str]] = None) -> None:
20 """
21 Initialize the DataCatalogRetriever.
23 Args:
24 database_name (str): The data source to retrieve metadata from.
25 table_names (Optional[List[str]]): The list of table names to retrieve metadata for. If None, all tables will be read.
26 """
27 from mindsdb.api.executor.controllers.session_controller import (
28 SessionController,
29 )
31 session = SessionController()
33 self.database_name = database_name
34 self.data_handler: Union[MetaDatabaseHandler, MetaAPIHandler] = session.integration_controller.get_data_handler(
35 database_name
36 )
37 integration = session.integration_controller.get(database_name)
38 self.integration_id = integration["id"]
39 self.integration_engine = integration["engine"]
40 # TODO: Handle situations where a schema is provided along with the database name, e.g., 'schema.table'.
41 # TODO: Handle situations where a file path is provided with integrations like S3, e.g., 'dir/file.csv'.
42 self.table_names = table_names
44 self.logger = logger
46 def retrieve_metadata_as_string(self) -> str:
47 """
48 Retrieve the metadata as a formatted string.
49 """
50 tables_df = self.retrieve_tables()
51 if tables_df.empty:
52 return f"No metadata found for database '{self.database_name}'"
54 metadata_str = "Data Catalog: \n"
55 handler_info = self.retrieve_handler_info()
56 if handler_info:
57 metadata_str += handler_info + "\n\n"
59 columns_df = self.retrieve_columns()
60 column_stats_df = self.retrieve_column_statistics()
61 primary_keys_df = self.retrieve_primary_keys()
62 foreign_keys_df = self.retrieve_foreign_keys()
64 metadata_str += self._construct_metadata_string_for_tables(
65 tables_df,
66 columns_df,
67 column_stats_df,
68 primary_keys_df,
69 foreign_keys_df,
70 )
71 return metadata_str
73 def _construct_metadata_string_for_tables(
74 self,
75 tables_df: pd.DataFrame,
76 columns_df: pd.DataFrame,
77 column_stats_df: pd.DataFrame,
78 primary_keys_df: pd.DataFrame,
79 foreign_keys_df: pd.DataFrame,
80 ) -> str:
81 """
82 Construct a formatted string representation of the metadata for the given tables.
83 """
84 tables_metadata_str = ""
86 # Convert all DataFrame column names to uppercase for consistency.
87 tables_df.columns = tables_df.columns.astype(str).str.upper()
88 columns_df.columns = columns_df.columns.astype(str).str.upper()
89 column_stats_df.columns = column_stats_df.columns.astype(str).str.upper()
90 primary_keys_df.columns = primary_keys_df.columns.astype(str).str.upper()
91 foreign_keys_df.columns = foreign_keys_df.columns.astype(str).str.upper()
93 for _, table_row in tables_df.iterrows():
94 table_columns_df = columns_df[columns_df["TABLE_NAME"] == table_row["TABLE_NAME"]]
96 table_column_stats_df = pd.DataFrame()
97 table_primary_keys_df = pd.DataFrame()
98 table_foreign_keys_df = pd.DataFrame()
99 # If no columns are found for the table,
100 # looking for column stats, primary keys, and foreign keys is redundant.
101 if not table_columns_df.empty:
102 if not column_stats_df.empty:
103 table_column_stats_df = column_stats_df[column_stats_df["TABLE_NAME"] == table_row["TABLE_NAME"]]
104 if not primary_keys_df.empty:
105 table_primary_keys_df = primary_keys_df[primary_keys_df["TABLE_NAME"] == table_row["TABLE_NAME"]]
106 if not foreign_keys_df.empty:
107 table_foreign_keys_df = foreign_keys_df[foreign_keys_df["TABLE_NAME"] == table_row["TABLE_NAME"]]
109 tables_metadata_str += self._construct_metadata_string_for_table(
110 table_row,
111 table_columns_df,
112 table_column_stats_df,
113 table_primary_keys_df,
114 table_foreign_keys_df,
115 )
116 return tables_metadata_str
118 def _construct_metadata_string_for_table(
119 self,
120 table_row: pd.Series,
121 columns_df: pd.DataFrame,
122 column_stats_df: pd.DataFrame,
123 primary_keys_df: pd.DataFrame,
124 foreign_keys_df: pd.DataFrame,
125 ) -> str:
126 """
127 Construct a formatted string representation of the metadata for a single table.
128 """
129 table_metadata_str = f"`{self.database_name}`.`{table_row['TABLE_NAME']}`"
131 if "TABLE_TYPE" in table_row and pd.notna(table_row["TABLE_TYPE"]):
132 table_metadata_str += f" ({table_row['TABLE_TYPE']})"
133 if "TABLE_DESCRIPTION" in table_row and pd.notna(table_row["TABLE_DESCRIPTION"]):
134 table_metadata_str += f": {table_row['TABLE_DESCRIPTION']}"
135 if "TABLE_SCHEMA" in table_row and pd.notna(table_row["TABLE_SCHEMA"]):
136 table_metadata_str += f"\nSchema: {table_row['TABLE_SCHEMA']}"
137 if "ROW_COUNT" in table_row and pd.notna(table_row["ROW_COUNT"]) and table_row["ROW_COUNT"] > 0:
138 table_metadata_str += f"\nEstimated Row Count: {int(table_row['ROW_COUNT'])}"
140 if not primary_keys_df.empty:
141 table_metadata_str += self._construct_metadata_string_for_primary_keys(primary_keys_df)
143 if not columns_df.empty:
144 table_metadata_str += self._construct_metadata_string_for_columns(columns_df, column_stats_df)
146 if not foreign_keys_df.empty:
147 table_metadata_str += self._construct_metadata_string_for_foreign_keys(
148 foreign_keys_df,
149 table_row["TABLE_NAME"],
150 )
152 return table_metadata_str
154 def _construct_metadata_string_for_primary_keys(
155 self,
156 primary_keys_df: pd.DataFrame,
157 ) -> str:
158 """
159 Construct a formatted string representation of the primary keys for a single table.
160 """
161 primary_keys_str = "\nPrimary Keys (in defined order): "
162 if "ORDINAL_POSITION" in primary_keys_df.columns:
163 primary_keys_df.sort_values(by="ORDINAL_POSITION", inplace=True)
164 primary_keys = primary_keys_df["COLUMN_NAME"].tolist()
165 primary_keys_str += ", ".join([f"`{pk}`" for pk in primary_keys])
166 return primary_keys_str
168 def _construct_metadata_string_for_columns(
169 self,
170 columns_df: pd.DataFrame,
171 column_stats_df: pd.DataFrame,
172 ) -> str:
173 """
174 Construct a formatted string representation of the columns for a single table.
175 """
176 columns_str = "\n\nColumns:\n"
177 for _, column_row in columns_df.iterrows():
178 # Ideally, there should be only one stats row per column.
179 stats_row = column_stats_df[column_stats_df["COLUMN_NAME"] == column_row["COLUMN_NAME"]]
180 if len(stats_row) == 0:
181 stats_row = pd.Series()
182 else:
183 stats_row = stats_row.iloc[0]
184 columns_str += self._construct_metadata_string_for_column(
185 column_row,
186 stats_row,
187 )
188 return columns_str
190 def _construct_metadata_string_for_column(
191 self,
192 column_row: pd.Series,
193 column_stats_row: pd.Series,
194 ) -> str:
195 """
196 Construct a formatted string representation of a single column.
197 """
198 pad = " " * 4
199 column_str = f"{column_row['COLUMN_NAME']} ({column_row['DATA_TYPE']}):"
201 if "COLUMN_DESCRIPTION" in column_row and pd.notna(column_row["COLUMN_DESCRIPTION"]):
202 column_str += f": {column_row['COLUMN_DESCRIPTION']}"
203 if "IS_NULLABLE" in column_row and pd.notna(column_row["IS_NULLABLE"]):
204 column_str += f"\n{pad}- Nullable: {column_row['IS_NULLABLE']}"
205 if "COLUMN_DEFAULT" in column_row and pd.notna(column_row["COLUMN_DEFAULT"]):
206 column_str += f"\n{pad}- Default Value: {column_row['COLUMN_DEFAULT']}"
208 if not column_stats_row.empty:
209 column_str += self._construct_metadata_string_for_column_statistics(column_stats_row, pad)
211 column_str += "\n\n"
213 return column_str
215 def _construct_metadata_string_for_column_statistics(
216 self,
217 stats_row: pd.Series,
218 pad: str,
219 ) -> str:
220 """
221 Construct a formatted string representation of the column statistics for a single column.
222 """
223 inner_pad = pad + " " * 4
224 inner_inner_pad = inner_pad + " " * 4
225 stats_str = f"\n{pad}- Column Statistics:"
227 # Most common values is expected to be a list, hence the check for non-null.
228 if "MOST_COMMON_VALUES" in stats_row and stats_row["MOST_COMMON_VALUES"]:
229 most_common_values = stats_row["MOST_COMMON_VALUES"]
230 # Handle case where most_common_values is a string representation of a list or other formats.
231 if isinstance(most_common_values, str):
232 most_common_values = self._parse_list_from_string(most_common_values)
233 elif isinstance(most_common_values, (list, tuple, pd.Series)):
234 most_common_values = list(most_common_values)
235 else:
236 most_common_values = [most_common_values]
238 if most_common_values and pd.notna(most_common_values).any():
239 stats_str += f"\n{inner_pad}- Top 10 Most Common Values and Frequencies:"
241 # Most common frequencies is also expected to be a list.
242 most_common_frequencies = []
243 if "MOST_COMMON_FREQUENCIES" in stats_row and stats_row["MOST_COMMON_FREQUENCIES"]:
244 most_common_frequencies = stats_row["MOST_COMMON_FREQUENCIES"]
245 if isinstance(most_common_frequencies, str):
246 most_common_frequencies = self._parse_list_from_string(most_common_frequencies)
247 elif isinstance(most_common_frequencies, (list, tuple, pd.Series)):
248 most_common_frequencies = list(most_common_frequencies)
249 else:
250 most_common_frequencies = [most_common_frequencies]
252 for i in range(min(10, len(most_common_values))):
253 if (
254 most_common_frequencies
255 and pd.notna(most_common_frequencies).any()
256 and i < len(most_common_frequencies)
257 ):
258 freq = most_common_frequencies[i]
259 try:
260 percent = float(freq) * 100
261 freq_str = f"{percent:.2f}%"
262 except (ValueError, TypeError):
263 freq_str = str(freq)
264 else:
265 freq_str = ""
267 stats_str += f"\n{inner_inner_pad}- {most_common_values[i]}" + (f": {freq_str}" if freq_str else "")
268 stats_str += "\n"
270 if "NULL_PERCENTAGE" in stats_row and pd.notna(stats_row["NULL_PERCENTAGE"]):
271 stats_str += f"\n{inner_pad}- Null Percentage: {stats_row['NULL_PERCENTAGE']}"
272 if "DISTINCT_VALUES_COUNT" in stats_row and pd.notna(stats_row["DISTINCT_VALUES_COUNT"]):
273 stats_str += f"\n{inner_pad}- No. of Distinct Values: {stats_row['DISTINCT_VALUES_COUNT']}"
274 if "MINIMUM_VALUE" in stats_row and pd.notna(stats_row["MINIMUM_VALUE"]):
275 stats_str += f"\n{inner_pad}- Minimum Value: {stats_row['MINIMUM_VALUE']}"
276 if "MAXIMUM_VALUE" in stats_row and pd.notna(stats_row["MAXIMUM_VALUE"]):
277 stats_str += f"\n{inner_pad}- Maximum Value: {stats_row['MAXIMUM_VALUE']}"
279 return stats_str
281 def _parse_list_from_string(self, list_str: str) -> List[str]:
282 """
283 Safely parse a string representation of a list into an actual list.
284 This is used to handle most common values and frequencies stored as strings.
285 """
286 # Try to safely parse python-like list strings: "['a','b']" or '["a","b"]'.
287 try:
288 parsed = ast.literal_eval(list_str)
289 if isinstance(parsed, (list, tuple)):
290 lst = [x for x in parsed if not pd.isna(x)]
291 else:
292 # fallback: treat parsed as scalar
293 if not pd.isna(parsed):
294 lst = [parsed]
295 except (ValueError, SyntaxError):
296 # fallback to splitting on comma for simpler string formats
297 s = list_str.strip("[]")
298 lst = [v.strip() for v in s.split(",") if v.strip()]
299 return lst
301 def _construct_metadata_string_for_foreign_keys(
302 self,
303 foreign_keys_df: pd.DataFrame,
304 table_name: str,
305 ) -> str:
306 """
307 Construct a formatted string representation of the foreign keys for a single table.
308 """
309 pad = " " * 4
310 foreign_keys_str = "\n\nKey Relationships:"
311 for _, fk_row in foreign_keys_df.iterrows():
312 # Avoid relationships where the current table is the child table to prevent redundancy.
313 if fk_row["CHILD_TABLE_NAME"] == table_name:
314 continue
315 foreign_keys_str += f"{pad}-{fk_row['CHILD_COLUMN_NAME']} in `{fk_row['CHILD_TABLE_NAME']}` references {fk_row['PARENT_COLUMN_NAME']} in `{fk_row['PARENT_TABLE_NAME']}`\n"
316 return foreign_keys_str
318 def retrieve_tables(self) -> pd.DataFrame:
319 """
320 Retrieve the table metadata from the handler.
321 """
322 self.logger.info(
323 f"Retrieving {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}"
324 )
325 response = self.data_handler.meta_get_tables(self.table_names)
326 if response.resp_type == RESPONSE_TYPE.ERROR:
327 self.logger.error(f"Failed to retrieve tables for {self.database_name}: {response.error_message}")
328 return pd.DataFrame()
329 elif response.resp_type == RESPONSE_TYPE.OK:
330 self.logger.error(f"No tables found for {self.database_name} in the data source.")
331 return pd.DataFrame()
333 return response.data_frame
335 def retrieve_columns(self) -> pd.DataFrame:
336 """
337 Retrieve the column metadata from the handler.
338 """
339 self.logger.info(
340 f"Retrieving columns for {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}"
341 )
342 response = self.data_handler.meta_get_columns(self.table_names)
343 if response.resp_type == RESPONSE_TYPE.ERROR:
344 self.logger.error(f"Failed to retrieve columns for {self.database_name}: {response.error_message}")
345 return pd.DataFrame()
346 elif response.resp_type == RESPONSE_TYPE.OK:
347 self.logger.error(f"No columns found for {self.database_name} in the data source.")
348 return pd.DataFrame()
350 return response.data_frame
352 def retrieve_column_statistics(self) -> pd.DataFrame:
353 """
354 Retrieve the column statistics from the handler.
355 """
356 self.logger.info(
357 f"Retrieving column statistics for {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}"
358 )
359 response = self.data_handler.meta_get_column_statistics(self.table_names)
360 if response.resp_type == RESPONSE_TYPE.ERROR:
361 self.logger.error(
362 f"Failed to retrieve column statistics for {self.database_name}: {response.error_message}"
363 )
364 return pd.DataFrame()
365 elif response.resp_type == RESPONSE_TYPE.OK:
366 self.logger.error(f"No column statistics found for {self.database_name} in the data source.")
367 return pd.DataFrame()
369 return response.data_frame
371 def retrieve_primary_keys(self) -> pd.DataFrame:
372 """
373 Retrieve the primary keys from the handler.
374 """
375 self.logger.info(
376 f"Retrieving primary keys for {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}"
377 )
378 response = self.data_handler.meta_get_primary_keys(self.table_names)
379 if response.resp_type == RESPONSE_TYPE.ERROR:
380 self.logger.error(f"Failed to retrieve primary keys for {self.database_name}: {response.error_message}")
381 return pd.DataFrame()
382 elif response.resp_type == RESPONSE_TYPE.OK:
383 self.logger.error(f"No primary keys found for {self.database_name} in the data source.")
384 return pd.DataFrame()
386 return response.data_frame
388 def retrieve_foreign_keys(self) -> pd.DataFrame:
389 """
390 Retrieve the foreign keys from the handler.
391 """
392 self.logger.info(
393 f"Retrieving foreign keys for {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}"
394 )
395 response = self.data_handler.meta_get_foreign_keys(self.table_names)
396 if response.resp_type == RESPONSE_TYPE.ERROR:
397 self.logger.error(f"Failed to retrieve foreign keys for {self.database_name}: {response.error_message}")
398 return pd.DataFrame()
399 elif response.resp_type == RESPONSE_TYPE.OK:
400 self.logger.error(f"No foreign keys found for {self.database_name} in the data source.")
401 return pd.DataFrame()
403 return response.data_frame
405 def retrieve_handler_info(self) -> str:
406 """
407 Retrieve the handler info from the handler.
408 """
409 self.logger.info(f"Retrieving handler info for {self.database_name}")
410 return self.data_handler.meta_get_handler_info()