Coverage for mindsdb / interfaces / data_catalog / data_catalog_retriever.py: 8%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import ast 

2import pandas as pd 

3from typing import List, Optional, Union 

4 

5from mindsdb.integrations.libs.api_handler import MetaAPIHandler 

6from mindsdb.integrations.libs.base import MetaDatabaseHandler 

7from mindsdb.integrations.libs.response import RESPONSE_TYPE 

8from mindsdb.utilities import log 

9 

10 

11logger = log.getLogger("mindsdb") 

12 

13 

14class DataCatalogRetriever: 

15 """ 

16 This class is responsible for retrieving (data catalog) metadata directly from the data source via the handler. 

17 """ 

18 

19 def __init__(self, database_name: str, table_names: Optional[List[str]] = None) -> None: 

20 """ 

21 Initialize the DataCatalogRetriever. 

22 

23 Args: 

24 database_name (str): The data source to retrieve metadata from. 

25 table_names (Optional[List[str]]): The list of table names to retrieve metadata for. If None, all tables will be read. 

26 """ 

27 from mindsdb.api.executor.controllers.session_controller import ( 

28 SessionController, 

29 ) 

30 

31 session = SessionController() 

32 

33 self.database_name = database_name 

34 self.data_handler: Union[MetaDatabaseHandler, MetaAPIHandler] = session.integration_controller.get_data_handler( 

35 database_name 

36 ) 

37 integration = session.integration_controller.get(database_name) 

38 self.integration_id = integration["id"] 

39 self.integration_engine = integration["engine"] 

40 # TODO: Handle situations where a schema is provided along with the database name, e.g., 'schema.table'. 

41 # TODO: Handle situations where a file path is provided with integrations like S3, e.g., 'dir/file.csv'. 

42 self.table_names = table_names 

43 

44 self.logger = logger 

45 

46 def retrieve_metadata_as_string(self) -> str: 

47 """ 

48 Retrieve the metadata as a formatted string. 

49 """ 

50 tables_df = self.retrieve_tables() 

51 if tables_df.empty: 

52 return f"No metadata found for database '{self.database_name}'" 

53 

54 metadata_str = "Data Catalog: \n" 

55 handler_info = self.retrieve_handler_info() 

56 if handler_info: 

57 metadata_str += handler_info + "\n\n" 

58 

59 columns_df = self.retrieve_columns() 

60 column_stats_df = self.retrieve_column_statistics() 

61 primary_keys_df = self.retrieve_primary_keys() 

62 foreign_keys_df = self.retrieve_foreign_keys() 

63 

64 metadata_str += self._construct_metadata_string_for_tables( 

65 tables_df, 

66 columns_df, 

67 column_stats_df, 

68 primary_keys_df, 

69 foreign_keys_df, 

70 ) 

71 return metadata_str 

72 

73 def _construct_metadata_string_for_tables( 

74 self, 

75 tables_df: pd.DataFrame, 

76 columns_df: pd.DataFrame, 

77 column_stats_df: pd.DataFrame, 

78 primary_keys_df: pd.DataFrame, 

79 foreign_keys_df: pd.DataFrame, 

80 ) -> str: 

81 """ 

82 Construct a formatted string representation of the metadata for the given tables. 

83 """ 

84 tables_metadata_str = "" 

85 

86 # Convert all DataFrame column names to uppercase for consistency. 

87 tables_df.columns = tables_df.columns.astype(str).str.upper() 

88 columns_df.columns = columns_df.columns.astype(str).str.upper() 

89 column_stats_df.columns = column_stats_df.columns.astype(str).str.upper() 

90 primary_keys_df.columns = primary_keys_df.columns.astype(str).str.upper() 

91 foreign_keys_df.columns = foreign_keys_df.columns.astype(str).str.upper() 

92 

93 for _, table_row in tables_df.iterrows(): 

94 table_columns_df = columns_df[columns_df["TABLE_NAME"] == table_row["TABLE_NAME"]] 

95 

96 table_column_stats_df = pd.DataFrame() 

97 table_primary_keys_df = pd.DataFrame() 

98 table_foreign_keys_df = pd.DataFrame() 

99 # If no columns are found for the table, 

100 # looking for column stats, primary keys, and foreign keys is redundant. 

101 if not table_columns_df.empty: 

102 if not column_stats_df.empty: 

103 table_column_stats_df = column_stats_df[column_stats_df["TABLE_NAME"] == table_row["TABLE_NAME"]] 

104 if not primary_keys_df.empty: 

105 table_primary_keys_df = primary_keys_df[primary_keys_df["TABLE_NAME"] == table_row["TABLE_NAME"]] 

106 if not foreign_keys_df.empty: 

107 table_foreign_keys_df = foreign_keys_df[foreign_keys_df["TABLE_NAME"] == table_row["TABLE_NAME"]] 

108 

109 tables_metadata_str += self._construct_metadata_string_for_table( 

110 table_row, 

111 table_columns_df, 

112 table_column_stats_df, 

113 table_primary_keys_df, 

114 table_foreign_keys_df, 

115 ) 

116 return tables_metadata_str 

117 

118 def _construct_metadata_string_for_table( 

119 self, 

120 table_row: pd.Series, 

121 columns_df: pd.DataFrame, 

122 column_stats_df: pd.DataFrame, 

123 primary_keys_df: pd.DataFrame, 

124 foreign_keys_df: pd.DataFrame, 

125 ) -> str: 

126 """ 

127 Construct a formatted string representation of the metadata for a single table. 

128 """ 

129 table_metadata_str = f"`{self.database_name}`.`{table_row['TABLE_NAME']}`" 

130 

131 if "TABLE_TYPE" in table_row and pd.notna(table_row["TABLE_TYPE"]): 

132 table_metadata_str += f" ({table_row['TABLE_TYPE']})" 

133 if "TABLE_DESCRIPTION" in table_row and pd.notna(table_row["TABLE_DESCRIPTION"]): 

134 table_metadata_str += f": {table_row['TABLE_DESCRIPTION']}" 

135 if "TABLE_SCHEMA" in table_row and pd.notna(table_row["TABLE_SCHEMA"]): 

136 table_metadata_str += f"\nSchema: {table_row['TABLE_SCHEMA']}" 

137 if "ROW_COUNT" in table_row and pd.notna(table_row["ROW_COUNT"]) and table_row["ROW_COUNT"] > 0: 

138 table_metadata_str += f"\nEstimated Row Count: {int(table_row['ROW_COUNT'])}" 

139 

140 if not primary_keys_df.empty: 

141 table_metadata_str += self._construct_metadata_string_for_primary_keys(primary_keys_df) 

142 

143 if not columns_df.empty: 

144 table_metadata_str += self._construct_metadata_string_for_columns(columns_df, column_stats_df) 

145 

146 if not foreign_keys_df.empty: 

147 table_metadata_str += self._construct_metadata_string_for_foreign_keys( 

148 foreign_keys_df, 

149 table_row["TABLE_NAME"], 

150 ) 

151 

152 return table_metadata_str 

153 

154 def _construct_metadata_string_for_primary_keys( 

155 self, 

156 primary_keys_df: pd.DataFrame, 

157 ) -> str: 

158 """ 

159 Construct a formatted string representation of the primary keys for a single table. 

160 """ 

161 primary_keys_str = "\nPrimary Keys (in defined order): " 

162 if "ORDINAL_POSITION" in primary_keys_df.columns: 

163 primary_keys_df.sort_values(by="ORDINAL_POSITION", inplace=True) 

164 primary_keys = primary_keys_df["COLUMN_NAME"].tolist() 

165 primary_keys_str += ", ".join([f"`{pk}`" for pk in primary_keys]) 

166 return primary_keys_str 

167 

168 def _construct_metadata_string_for_columns( 

169 self, 

170 columns_df: pd.DataFrame, 

171 column_stats_df: pd.DataFrame, 

172 ) -> str: 

173 """ 

174 Construct a formatted string representation of the columns for a single table. 

175 """ 

176 columns_str = "\n\nColumns:\n" 

177 for _, column_row in columns_df.iterrows(): 

178 # Ideally, there should be only one stats row per column. 

179 stats_row = column_stats_df[column_stats_df["COLUMN_NAME"] == column_row["COLUMN_NAME"]] 

180 if len(stats_row) == 0: 

181 stats_row = pd.Series() 

182 else: 

183 stats_row = stats_row.iloc[0] 

184 columns_str += self._construct_metadata_string_for_column( 

185 column_row, 

186 stats_row, 

187 ) 

188 return columns_str 

189 

190 def _construct_metadata_string_for_column( 

191 self, 

192 column_row: pd.Series, 

193 column_stats_row: pd.Series, 

194 ) -> str: 

195 """ 

196 Construct a formatted string representation of a single column. 

197 """ 

198 pad = " " * 4 

199 column_str = f"{column_row['COLUMN_NAME']} ({column_row['DATA_TYPE']}):" 

200 

201 if "COLUMN_DESCRIPTION" in column_row and pd.notna(column_row["COLUMN_DESCRIPTION"]): 

202 column_str += f": {column_row['COLUMN_DESCRIPTION']}" 

203 if "IS_NULLABLE" in column_row and pd.notna(column_row["IS_NULLABLE"]): 

204 column_str += f"\n{pad}- Nullable: {column_row['IS_NULLABLE']}" 

205 if "COLUMN_DEFAULT" in column_row and pd.notna(column_row["COLUMN_DEFAULT"]): 

206 column_str += f"\n{pad}- Default Value: {column_row['COLUMN_DEFAULT']}" 

207 

208 if not column_stats_row.empty: 

209 column_str += self._construct_metadata_string_for_column_statistics(column_stats_row, pad) 

210 

211 column_str += "\n\n" 

212 

213 return column_str 

214 

215 def _construct_metadata_string_for_column_statistics( 

216 self, 

217 stats_row: pd.Series, 

218 pad: str, 

219 ) -> str: 

220 """ 

221 Construct a formatted string representation of the column statistics for a single column. 

222 """ 

223 inner_pad = pad + " " * 4 

224 inner_inner_pad = inner_pad + " " * 4 

225 stats_str = f"\n{pad}- Column Statistics:" 

226 

227 # Most common values is expected to be a list, hence the check for non-null. 

228 if "MOST_COMMON_VALUES" in stats_row and stats_row["MOST_COMMON_VALUES"]: 

229 most_common_values = stats_row["MOST_COMMON_VALUES"] 

230 # Handle case where most_common_values is a string representation of a list or other formats. 

231 if isinstance(most_common_values, str): 

232 most_common_values = self._parse_list_from_string(most_common_values) 

233 elif isinstance(most_common_values, (list, tuple, pd.Series)): 

234 most_common_values = list(most_common_values) 

235 else: 

236 most_common_values = [most_common_values] 

237 

238 if most_common_values and pd.notna(most_common_values).any(): 

239 stats_str += f"\n{inner_pad}- Top 10 Most Common Values and Frequencies:" 

240 

241 # Most common frequencies is also expected to be a list. 

242 most_common_frequencies = [] 

243 if "MOST_COMMON_FREQUENCIES" in stats_row and stats_row["MOST_COMMON_FREQUENCIES"]: 

244 most_common_frequencies = stats_row["MOST_COMMON_FREQUENCIES"] 

245 if isinstance(most_common_frequencies, str): 

246 most_common_frequencies = self._parse_list_from_string(most_common_frequencies) 

247 elif isinstance(most_common_frequencies, (list, tuple, pd.Series)): 

248 most_common_frequencies = list(most_common_frequencies) 

249 else: 

250 most_common_frequencies = [most_common_frequencies] 

251 

252 for i in range(min(10, len(most_common_values))): 

253 if ( 

254 most_common_frequencies 

255 and pd.notna(most_common_frequencies).any() 

256 and i < len(most_common_frequencies) 

257 ): 

258 freq = most_common_frequencies[i] 

259 try: 

260 percent = float(freq) * 100 

261 freq_str = f"{percent:.2f}%" 

262 except (ValueError, TypeError): 

263 freq_str = str(freq) 

264 else: 

265 freq_str = "" 

266 

267 stats_str += f"\n{inner_inner_pad}- {most_common_values[i]}" + (f": {freq_str}" if freq_str else "") 

268 stats_str += "\n" 

269 

270 if "NULL_PERCENTAGE" in stats_row and pd.notna(stats_row["NULL_PERCENTAGE"]): 

271 stats_str += f"\n{inner_pad}- Null Percentage: {stats_row['NULL_PERCENTAGE']}" 

272 if "DISTINCT_VALUES_COUNT" in stats_row and pd.notna(stats_row["DISTINCT_VALUES_COUNT"]): 

273 stats_str += f"\n{inner_pad}- No. of Distinct Values: {stats_row['DISTINCT_VALUES_COUNT']}" 

274 if "MINIMUM_VALUE" in stats_row and pd.notna(stats_row["MINIMUM_VALUE"]): 

275 stats_str += f"\n{inner_pad}- Minimum Value: {stats_row['MINIMUM_VALUE']}" 

276 if "MAXIMUM_VALUE" in stats_row and pd.notna(stats_row["MAXIMUM_VALUE"]): 

277 stats_str += f"\n{inner_pad}- Maximum Value: {stats_row['MAXIMUM_VALUE']}" 

278 

279 return stats_str 

280 

281 def _parse_list_from_string(self, list_str: str) -> List[str]: 

282 """ 

283 Safely parse a string representation of a list into an actual list. 

284 This is used to handle most common values and frequencies stored as strings. 

285 """ 

286 # Try to safely parse python-like list strings: "['a','b']" or '["a","b"]'. 

287 try: 

288 parsed = ast.literal_eval(list_str) 

289 if isinstance(parsed, (list, tuple)): 

290 lst = [x for x in parsed if not pd.isna(x)] 

291 else: 

292 # fallback: treat parsed as scalar 

293 if not pd.isna(parsed): 

294 lst = [parsed] 

295 except (ValueError, SyntaxError): 

296 # fallback to splitting on comma for simpler string formats 

297 s = list_str.strip("[]") 

298 lst = [v.strip() for v in s.split(",") if v.strip()] 

299 return lst 

300 

301 def _construct_metadata_string_for_foreign_keys( 

302 self, 

303 foreign_keys_df: pd.DataFrame, 

304 table_name: str, 

305 ) -> str: 

306 """ 

307 Construct a formatted string representation of the foreign keys for a single table. 

308 """ 

309 pad = " " * 4 

310 foreign_keys_str = "\n\nKey Relationships:" 

311 for _, fk_row in foreign_keys_df.iterrows(): 

312 # Avoid relationships where the current table is the child table to prevent redundancy. 

313 if fk_row["CHILD_TABLE_NAME"] == table_name: 

314 continue 

315 foreign_keys_str += f"{pad}-{fk_row['CHILD_COLUMN_NAME']} in `{fk_row['CHILD_TABLE_NAME']}` references {fk_row['PARENT_COLUMN_NAME']} in `{fk_row['PARENT_TABLE_NAME']}`\n" 

316 return foreign_keys_str 

317 

318 def retrieve_tables(self) -> pd.DataFrame: 

319 """ 

320 Retrieve the table metadata from the handler. 

321 """ 

322 self.logger.info( 

323 f"Retrieving {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}" 

324 ) 

325 response = self.data_handler.meta_get_tables(self.table_names) 

326 if response.resp_type == RESPONSE_TYPE.ERROR: 

327 self.logger.error(f"Failed to retrieve tables for {self.database_name}: {response.error_message}") 

328 return pd.DataFrame() 

329 elif response.resp_type == RESPONSE_TYPE.OK: 

330 self.logger.error(f"No tables found for {self.database_name} in the data source.") 

331 return pd.DataFrame() 

332 

333 return response.data_frame 

334 

335 def retrieve_columns(self) -> pd.DataFrame: 

336 """ 

337 Retrieve the column metadata from the handler. 

338 """ 

339 self.logger.info( 

340 f"Retrieving columns for {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}" 

341 ) 

342 response = self.data_handler.meta_get_columns(self.table_names) 

343 if response.resp_type == RESPONSE_TYPE.ERROR: 

344 self.logger.error(f"Failed to retrieve columns for {self.database_name}: {response.error_message}") 

345 return pd.DataFrame() 

346 elif response.resp_type == RESPONSE_TYPE.OK: 

347 self.logger.error(f"No columns found for {self.database_name} in the data source.") 

348 return pd.DataFrame() 

349 

350 return response.data_frame 

351 

352 def retrieve_column_statistics(self) -> pd.DataFrame: 

353 """ 

354 Retrieve the column statistics from the handler. 

355 """ 

356 self.logger.info( 

357 f"Retrieving column statistics for {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}" 

358 ) 

359 response = self.data_handler.meta_get_column_statistics(self.table_names) 

360 if response.resp_type == RESPONSE_TYPE.ERROR: 

361 self.logger.error( 

362 f"Failed to retrieve column statistics for {self.database_name}: {response.error_message}" 

363 ) 

364 return pd.DataFrame() 

365 elif response.resp_type == RESPONSE_TYPE.OK: 

366 self.logger.error(f"No column statistics found for {self.database_name} in the data source.") 

367 return pd.DataFrame() 

368 

369 return response.data_frame 

370 

371 def retrieve_primary_keys(self) -> pd.DataFrame: 

372 """ 

373 Retrieve the primary keys from the handler. 

374 """ 

375 self.logger.info( 

376 f"Retrieving primary keys for {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}" 

377 ) 

378 response = self.data_handler.meta_get_primary_keys(self.table_names) 

379 if response.resp_type == RESPONSE_TYPE.ERROR: 

380 self.logger.error(f"Failed to retrieve primary keys for {self.database_name}: {response.error_message}") 

381 return pd.DataFrame() 

382 elif response.resp_type == RESPONSE_TYPE.OK: 

383 self.logger.error(f"No primary keys found for {self.database_name} in the data source.") 

384 return pd.DataFrame() 

385 

386 return response.data_frame 

387 

388 def retrieve_foreign_keys(self) -> pd.DataFrame: 

389 """ 

390 Retrieve the foreign keys from the handler. 

391 """ 

392 self.logger.info( 

393 f"Retrieving foreign keys for {', '.join(self.table_names) if self.table_names else 'all'} tables for {self.database_name}" 

394 ) 

395 response = self.data_handler.meta_get_foreign_keys(self.table_names) 

396 if response.resp_type == RESPONSE_TYPE.ERROR: 

397 self.logger.error(f"Failed to retrieve foreign keys for {self.database_name}: {response.error_message}") 

398 return pd.DataFrame() 

399 elif response.resp_type == RESPONSE_TYPE.OK: 

400 self.logger.error(f"No foreign keys found for {self.database_name} in the data source.") 

401 return pd.DataFrame() 

402 

403 return response.data_frame 

404 

405 def retrieve_handler_info(self) -> str: 

406 """ 

407 Retrieve the handler info from the handler. 

408 """ 

409 self.logger.info(f"Retrieving handler info for {self.database_name}") 

410 return self.data_handler.meta_get_handler_info()