Coverage for mindsdb / integrations / handlers / postgres_handler / postgres_handler.py: 90%

326 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2import json 

3import logging 

4from typing import Optional, Any 

5 

6import pandas as pd 

7from pandas import DataFrame 

8import psycopg 

9from psycopg import Column as PGColumn, Cursor 

10from psycopg.postgres import TypeInfo, types as pg_types 

11from psycopg.pq import ExecStatus 

12 

13from mindsdb_sql_parser import parse_sql 

14from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

15from mindsdb_sql_parser.ast.base import ASTNode 

16 

17from mindsdb.integrations.libs.base import MetaDatabaseHandler 

18from mindsdb.utilities import log 

19from mindsdb.integrations.libs.response import ( 

20 HandlerStatusResponse as StatusResponse, 

21 HandlerResponse as Response, 

22 RESPONSE_TYPE, 

23) 

24import mindsdb.utilities.profiler as profiler 

25from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE 

26 

27logger = log.getLogger(__name__) 

28 

29SUBSCRIBE_SLEEP_INTERVAL = 1 

30 

31 

32def _map_type(internal_type_name: str | None) -> MYSQL_DATA_TYPE: 

33 """Map Postgres types to MySQL types. 

34 

35 Args: 

36 internal_type_name (str): The name of the Postgres type to map. 

37 

38 Returns: 

39 MYSQL_DATA_TYPE: The MySQL type that corresponds to the Postgres type. 

40 """ 

41 fallback_type = MYSQL_DATA_TYPE.VARCHAR 

42 

43 if internal_type_name is None: 

44 return fallback_type 

45 

46 internal_type_name = internal_type_name.lower() 

47 types_map = { 

48 ("smallint", "smallserial"): MYSQL_DATA_TYPE.SMALLINT, 

49 ("integer", "int", "serial"): MYSQL_DATA_TYPE.INT, 

50 ("bigint", "bigserial"): MYSQL_DATA_TYPE.BIGINT, 

51 ("real", "float"): MYSQL_DATA_TYPE.FLOAT, 

52 ("numeric", "decimal"): MYSQL_DATA_TYPE.DECIMAL, 

53 ("double precision",): MYSQL_DATA_TYPE.DOUBLE, 

54 ("character varying", "varchar"): MYSQL_DATA_TYPE.VARCHAR, 

55 # NOTE: if return chars-types as mysql's CHAR, then response will be padded with spaces, so return as TEXT 

56 ("money", "character", "char", "bpchar", "bpchar", "text"): MYSQL_DATA_TYPE.TEXT, 

57 ("timestamp", "timestamp without time zone", "timestamp with time zone"): MYSQL_DATA_TYPE.DATETIME, 

58 ("date",): MYSQL_DATA_TYPE.DATE, 

59 ("time", "time without time zone", "time with time zone"): MYSQL_DATA_TYPE.TIME, 

60 ("boolean",): MYSQL_DATA_TYPE.BOOL, 

61 ("bytea",): MYSQL_DATA_TYPE.BINARY, 

62 ("json", "jsonb"): MYSQL_DATA_TYPE.JSON, 

63 } 

64 

65 for db_types_list, mysql_data_type in types_map.items(): 

66 if internal_type_name in db_types_list: 

67 return mysql_data_type 

68 

69 logger.debug(f"Postgres handler type mapping: unknown type: {internal_type_name}, use VARCHAR as fallback.") 

70 return fallback_type 

71 

72 

73def _make_table_response(result: list[tuple[Any]], cursor: Cursor) -> Response: 

74 """Build response from result and cursor. 

75 

76 Args: 

77 result (list[tuple[Any]]): result of the query. 

78 cursor (psycopg.Cursor): cursor object. 

79 

80 Returns: 

81 Response: response object. 

82 """ 

83 description: list[PGColumn] = cursor.description 

84 mysql_types: list[MYSQL_DATA_TYPE] = [] 

85 for column in description: 

86 if column.type_display == "vector": 

87 # 'vector' is type of pgvector extension, added here as text to not import pgvector 

88 # NOTE: data returned as numpy array 

89 mysql_types.append(MYSQL_DATA_TYPE.VECTOR) 

90 continue 

91 pg_type_info: TypeInfo = pg_types.get(column.type_code) 

92 if pg_type_info is None: 92 ↛ 96line 92 didn't jump to line 96 because the condition on line 92 was never true

93 # postgres may return 'polymorphic type', which are not present in the pg_types 

94 # list of 'polymorphic type' can be obtained: 

95 # SELECT oid, typname, typcategory FROM pg_type WHERE typcategory = 'P' ORDER BY oid; 

96 if column.type_code in (2277, 5078): 

97 # anyarray, anycompatiblearray 

98 regtype = "json" 

99 else: 

100 logger.warning(f"Postgres handler: unknown type: {column.type_code}") 

101 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

102 continue 

103 elif pg_type_info.array_oid == column.type_code: 

104 # it is any array, handle is as json 

105 regtype: str = "json" 

106 else: 

107 regtype: str = pg_type_info.regtype if pg_type_info is not None else None 

108 mysql_type = _map_type(regtype) 

109 mysql_types.append(mysql_type) 

110 

111 # region cast int and bool to nullable types 

112 serieses = [] 

113 for i, mysql_type in enumerate(mysql_types): 

114 expected_dtype = None 

115 if mysql_type in ( 

116 MYSQL_DATA_TYPE.SMALLINT, 

117 MYSQL_DATA_TYPE.INT, 

118 MYSQL_DATA_TYPE.MEDIUMINT, 

119 MYSQL_DATA_TYPE.BIGINT, 

120 MYSQL_DATA_TYPE.TINYINT, 

121 ): 

122 expected_dtype = "Int64" 

123 elif mysql_type in (MYSQL_DATA_TYPE.BOOL, MYSQL_DATA_TYPE.BOOLEAN): 

124 expected_dtype = "boolean" 

125 serieses.append(pd.Series([row[i] for row in result], dtype=expected_dtype, name=description[i].name)) 

126 df = pd.concat(serieses, axis=1, copy=False) 

127 # endregion 

128 

129 return Response(RESPONSE_TYPE.TABLE, data_frame=df, affected_rows=cursor.rowcount, mysql_types=mysql_types) 

130 

131 

132class PostgresHandler(MetaDatabaseHandler): 

133 """ 

134 This handler handles connection and execution of the PostgreSQL statements. 

135 """ 

136 

137 name = "postgres" 

138 

139 @profiler.profile("init_pg_handler") 

140 def __init__(self, name=None, **kwargs): 

141 super().__init__(name) 

142 self.parser = parse_sql 

143 self.connection_args = kwargs.get("connection_data") 

144 self.dialect = "postgresql" 

145 self.database = self.connection_args.get("database") 

146 self.renderer = SqlalchemyRender("postgres") 

147 

148 self.connection = None 

149 self.is_connected = False 

150 self.cache_thread_safe = True 

151 

152 def __del__(self): 

153 if self.is_connected: 

154 self.disconnect() 

155 

156 def _make_connection_args(self): 

157 config = { 

158 "host": self.connection_args.get("host"), 

159 "port": self.connection_args.get("port"), 

160 "user": self.connection_args.get("user"), 

161 "password": self.connection_args.get("password"), 

162 "dbname": self.connection_args.get("database"), 

163 } 

164 

165 # https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS 

166 connection_parameters = self.connection_args.get("connection_parameters") 

167 if isinstance(connection_parameters, dict) is False: 

168 connection_parameters = {} 

169 if "connect_timeout" not in connection_parameters: 169 ↛ 171line 169 didn't jump to line 171 because the condition on line 169 was always true

170 connection_parameters["connect_timeout"] = 10 

171 config.update(connection_parameters) 

172 

173 if self.connection_args.get("sslmode"): 

174 config["sslmode"] = self.connection_args.get("sslmode") 

175 

176 if self.connection_args.get("autocommit"): 

177 config["autocommit"] = self.connection_args.get("autocommit") 

178 

179 # If schema is not provided set public as default one 

180 if self.connection_args.get("schema"): 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true

181 config["options"] = f"-c search_path={self.connection_args.get('schema')},public" 

182 return config 

183 

184 @profiler.profile() 

185 def connect(self): 

186 """ 

187 Establishes a connection to a PostgreSQL database. 

188 

189 Raises: 

190 psycopg.Error: If an error occurs while connecting to the PostgreSQL database. 

191 

192 Returns: 

193 psycopg.Connection: A connection object to the PostgreSQL database. 

194 """ 

195 if self.is_connected: 

196 return self.connection 

197 

198 config = self._make_connection_args() 

199 try: 

200 self.connection = psycopg.connect(**config) 

201 self.is_connected = True 

202 return self.connection 

203 except psycopg.Error as e: 

204 logger.error(f"Error connecting to PostgreSQL {self.database}, {e}!") 

205 self.is_connected = False 

206 raise 

207 

208 def disconnect(self): 

209 """ 

210 Closes the connection to the PostgreSQL database if it's currently open. 

211 """ 

212 if not self.is_connected: 

213 return 

214 self.connection.close() 

215 self.is_connected = False 

216 

217 def check_connection(self) -> StatusResponse: 

218 """ 

219 Checks the status of the connection to the PostgreSQL database. 

220 

221 Returns: 

222 StatusResponse: An object containing the success status and an error message if an error occurs. 

223 """ 

224 response = StatusResponse(False) 

225 need_to_close = not self.is_connected 

226 

227 try: 

228 connection = self.connect() 

229 with connection.cursor() as cur: 

230 # Execute a simple query to test the connection 

231 cur.execute("select 1;") 

232 response.success = True 

233 except psycopg.Error as e: 

234 logger.error(f"Error connecting to PostgreSQL {self.database}, {e}!") 

235 response.error_message = str(e) 

236 

237 if response.success and need_to_close: 

238 self.disconnect() 

239 elif not response.success and self.is_connected: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 self.is_connected = False 

241 

242 return response 

243 

244 def _cast_dtypes(self, df: DataFrame, description: list) -> DataFrame: 

245 """ 

246 Cast df dtypes basing on postgres types 

247 Note: 

248 Date types casting is not provided because of there is no issues (so far). 

249 By default pandas will cast postgres date types to: 

250 - date -> object 

251 - time -> object 

252 - timetz -> object 

253 - timestamp -> datetime64[ns] 

254 - timestamptz -> datetime64[ns, {tz}] 

255 

256 Args: 

257 df (DataFrame) 

258 description (list): psycopg cursor description 

259 """ 

260 types_map = { 

261 "int2": "int16", 

262 "int4": "int32", 

263 "int8": "int64", 

264 "numeric": "float64", 

265 "float4": "float32", 

266 "float8": "float64", 

267 } 

268 columns = df.columns 

269 df.columns = list(range(len(columns))) 

270 for column_index, column_name in enumerate(df.columns): 

271 col = df[column_name] 

272 if str(col.dtype) == "object": 

273 pg_type_info: TypeInfo = pg_types.get(description[column_index].type_code) # type_code is int!? 

274 if pg_type_info is not None and pg_type_info.name in types_map: 

275 col = col.fillna(0) # TODO rework 

276 try: 

277 df[column_name] = col.astype(types_map[pg_type_info.name]) 

278 except ValueError as e: 

279 logger.error(f"Error casting column {col.name} to {types_map[pg_type_info.name]}: {e}") 

280 df.columns = columns 

281 

282 @profiler.profile() 

283 def native_query(self, query: str, params=None, **kwargs) -> Response: 

284 """ 

285 Executes a SQL query on the PostgreSQL database and returns the result. 

286 

287 Args: 

288 query (str): The SQL query to be executed. 

289 

290 Returns: 

291 Response: A response object containing the result of the query or an error message. 

292 """ 

293 need_to_close = not self.is_connected 

294 

295 connection = self.connect() 

296 with connection.cursor() as cur: 

297 try: 

298 if params is not None: 

299 cur.executemany(query, params) 

300 else: 

301 cur.execute(query) 

302 if cur.pgresult is None or ExecStatus(cur.pgresult.status) == ExecStatus.COMMAND_OK: 

303 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount) 

304 else: 

305 result = cur.fetchall() 

306 response = _make_table_response(result, cur) 

307 connection.commit() 

308 except (psycopg.ProgrammingError, psycopg.DataError) as e: 

309 # These is 'expected' exceptions, they should not be treated as mindsdb's errors 

310 # ProgrammingError: table not found or already exists, syntax error, etc 

311 # DataError: division by zero, numeric value out of range, etc. 

312 # https://www.psycopg.org/psycopg3/docs/api/errors.html 

313 log_message = "Database query failed with error, likely due to invalid SQL query" 

314 if logger.isEnabledFor(logging.DEBUG): 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 log_message += f". Executed query:\n{query}" 

316 logger.info(log_message) 

317 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e), is_expected_error=True) 

318 connection.rollback() 

319 except Exception as e: 

320 logger.error(f"Error running query:\n{query}\non {self.database}, {e}") 

321 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e)) 

322 connection.rollback() 

323 

324 if need_to_close: 

325 self.disconnect() 

326 

327 return response 

328 

329 def query_stream(self, query: ASTNode, fetch_size: int = 1000): 

330 """ 

331 Executes a SQL query and stream results outside by batches 

332 

333 :param query: An ASTNode representing the SQL query to be executed. 

334 :param fetch_size: size of the batch 

335 :return: generator with query results 

336 """ 

337 query_str, params = self.renderer.get_exec_params(query, with_failback=True) 

338 

339 need_to_close = not self.is_connected 

340 

341 connection = self.connect() 

342 with connection.cursor() as cur: 

343 try: 

344 if params is not None: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true

345 cur.executemany(query_str, params) 

346 else: 

347 cur.execute(query_str) 

348 

349 if cur.pgresult is not None and ExecStatus(cur.pgresult.status) != ExecStatus.COMMAND_OK: 349 ↛ 357line 349 didn't jump to line 357 because the condition on line 349 was always true

350 while True: 

351 result = cur.fetchmany(fetch_size) 

352 if not result: 

353 break 

354 df = DataFrame(result, columns=[x.name for x in cur.description]) 

355 self._cast_dtypes(df, cur.description) 

356 yield df 

357 connection.commit() 

358 finally: 

359 connection.rollback() 

360 

361 if need_to_close: 361 ↛ exitline 361 didn't return from function 'query_stream' because the condition on line 361 was always true

362 self.disconnect() 

363 

364 def insert(self, table_name: str, df: pd.DataFrame) -> Response: 

365 need_to_close = not self.is_connected 

366 

367 connection = self.connect() 

368 

369 columns = df.columns 

370 

371 resp = self.get_columns(table_name) 

372 

373 # copy requires precise cases of names: get current column names from table and adapt input dataframe columns 

374 if resp.data_frame is not None and not resp.data_frame.empty: 374 ↛ 380line 374 didn't jump to line 380 because the condition on line 374 was always true

375 db_columns = {c.lower(): c for c in resp.data_frame["COLUMN_NAME"]} 

376 

377 # try to get case of existing column 

378 columns = [db_columns.get(c.lower(), c) for c in columns] 

379 

380 columns = [f'"{c}"' for c in columns] 

381 rowcount = None 

382 

383 with connection.cursor() as cur: 

384 try: 

385 with cur.copy(f'copy "{table_name}" ({",".join(columns)}) from STDIN WITH CSV') as copy: 

386 df.to_csv(copy, index=False, header=False) 

387 

388 connection.commit() 

389 except Exception as e: 

390 logger.error(f"Error running insert to {table_name} on {self.database}, {e}!") 

391 connection.rollback() 

392 raise e 

393 rowcount = cur.rowcount 

394 

395 if need_to_close: 395 ↛ 398line 395 didn't jump to line 398 because the condition on line 395 was always true

396 self.disconnect() 

397 

398 return Response(RESPONSE_TYPE.OK, affected_rows=rowcount) 

399 

400 @profiler.profile() 

401 def query(self, query: ASTNode) -> Response: 

402 """ 

403 Executes a SQL query represented by an ASTNode and retrieves the data. 

404 

405 Args: 

406 query (ASTNode): An ASTNode representing the SQL query to be executed. 

407 

408 Returns: 

409 Response: The response from the `native_query` method, containing the result of the SQL query execution. 

410 """ 

411 query_str, params = self.renderer.get_exec_params(query, with_failback=True) 

412 logger.debug(f"Executing SQL query: {query_str}") 

413 return self.native_query(query_str, params) 

414 

415 def get_tables(self, all: bool = False) -> Response: 

416 """ 

417 Retrieves a list of all non-system tables and views in the current schema of the PostgreSQL database. 

418 

419 Returns: 

420 Response: A response object containing the list of tables and views, formatted as per the `Response` class. 

421 """ 

422 all_filter = "and table_schema = current_schema()" 

423 if all is True: 

424 all_filter = "" 

425 query = f""" 

426 SELECT 

427 table_schema, 

428 table_name, 

429 table_type 

430 FROM 

431 information_schema.tables 

432 WHERE 

433 table_schema NOT IN ('information_schema', 'pg_catalog') 

434 and table_type in ('BASE TABLE', 'VIEW') 

435 {all_filter} 

436 """ 

437 return self.native_query(query) 

438 

439 def get_columns(self, table_name: str, schema_name: Optional[str] = None) -> Response: 

440 """ 

441 Retrieves column details for a specified table in the PostgreSQL database. 

442 

443 Args: 

444 table_name (str): The name of the table for which to retrieve column information. 

445 schema_name (str): The name of the schema in which the table is located. 

446 

447 Returns: 

448 Response: A response object containing the column details, formatted as per the `Response` class. 

449 

450 Raises: 

451 ValueError: If the 'table_name' is not a valid string. 

452 """ 

453 

454 if not table_name or not isinstance(table_name, str): 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 raise ValueError("Invalid table name provided.") 

456 if isinstance(schema_name, str): 

457 schema_name = f"'{schema_name}'" 

458 else: 

459 schema_name = "current_schema()" 

460 query = f""" 

461 SELECT 

462 COLUMN_NAME, 

463 DATA_TYPE, 

464 ORDINAL_POSITION, 

465 COLUMN_DEFAULT, 

466 IS_NULLABLE, 

467 CHARACTER_MAXIMUM_LENGTH, 

468 CHARACTER_OCTET_LENGTH, 

469 NUMERIC_PRECISION, 

470 NUMERIC_SCALE, 

471 DATETIME_PRECISION, 

472 CHARACTER_SET_NAME, 

473 COLLATION_NAME 

474 FROM 

475 information_schema.columns 

476 WHERE 

477 table_name = '{table_name}' 

478 AND 

479 table_schema = {schema_name} 

480 """ 

481 # If it is used by pgvector handler - `native_query` method of pgvector handler will be used 

482 # in that case if shared pgvector db is used - `native_query` will be skipped (return empty result) 

483 # `no_restrict` flag allows to execute native query, and it will call `native_query` of postgres handler 

484 result = self.native_query(query, no_restrict=True) 

485 result.to_columns_table_response(map_type_fn=_map_type) 

486 return result 

487 

488 def subscribe(self, stop_event, callback, table_name, columns=None, **kwargs): 

489 config = self._make_connection_args() 

490 config["autocommit"] = True 

491 

492 conn = psycopg.connect(**config) 

493 

494 # create db trigger 

495 trigger_name = f"mdb_notify_{table_name}" 

496 

497 before, after = "", "" 

498 

499 if columns: 499 ↛ 514line 499 didn't jump to line 514 because the condition on line 499 was always true

500 # check column exist 

501 conn.execute(f"select {','.join(columns)} from {table_name} limit 0") 

502 

503 columns = set(columns) 

504 trigger_name += "_" + "_".join(columns) 

505 

506 news, olds = [], [] 

507 for column in columns: 

508 news.append(f"NEW.{column}") 

509 olds.append(f"OLD.{column}") 

510 

511 before = f"IF ({', '.join(news)}) IS DISTINCT FROM ({', '.join(olds)}) then\n" 

512 after = "\nEND IF;" 

513 else: 

514 columns = set() 

515 

516 func_code = f""" 

517 CREATE OR REPLACE FUNCTION {trigger_name}() 

518 RETURNS trigger AS $$ 

519 DECLARE 

520 BEGIN 

521 {before} 

522 PERFORM pg_notify( '{trigger_name}', row_to_json(NEW)::text); 

523 {after} 

524 RETURN NEW; 

525 END; 

526 $$ LANGUAGE plpgsql; 

527 """ 

528 conn.execute(func_code) 

529 

530 # for after update - new and old have the same values 

531 conn.execute(f""" 

532 CREATE OR REPLACE TRIGGER {trigger_name} 

533 BEFORE INSERT OR UPDATE ON {table_name} 

534 FOR EACH ROW 

535 EXECUTE PROCEDURE {trigger_name}(); 

536 """) 

537 conn.commit() 

538 

539 # start listen 

540 conn.execute(f"LISTEN {trigger_name};") 

541 

542 def process_event(event): 

543 try: 

544 row = json.loads(event.payload) 

545 except json.JSONDecoder: 

546 return 

547 

548 # check column in input data 

549 if not columns or columns.intersection(row.keys()): 549 ↛ exitline 549 didn't return from function 'process_event' because the condition on line 549 was always true

550 callback(row) 

551 

552 try: 

553 conn.add_notify_handler(process_event) 

554 

555 while True: 

556 if stop_event.is_set(): 

557 # exit trigger 

558 return 

559 

560 # trigger getting updates 

561 # https://www.psycopg.org/psycopg3/docs/advanced/async.html#asynchronous-notifications 

562 conn.execute("SELECT 1").fetchone() 

563 

564 time.sleep(SUBSCRIBE_SLEEP_INTERVAL) 

565 

566 finally: 

567 conn.execute(f"drop TRIGGER {trigger_name} on {table_name}") 

568 conn.execute(f"drop FUNCTION {trigger_name}") 

569 conn.commit() 

570 

571 conn.close() 

572 

573 def meta_get_tables(self, table_names: Optional[list] = None) -> Response: 

574 """ 

575 Retrieves metadata information about the tables in the PostgreSQL database to be stored in the data catalog. 

576 

577 Args: 

578 table_names (list): A list of table names for which to retrieve metadata information. 

579 

580 Returns: 

581 Response: A response object containing the metadata information, formatted as per the `Response` class. 

582 """ 

583 query = """ 

584 SELECT 

585 t.table_name, 

586 t.table_schema, 

587 t.table_type, 

588 obj_description(pgc.oid, 'pg_class') AS table_description, 

589 pgc.reltuples AS row_count 

590 FROM information_schema.tables t 

591 JOIN pg_catalog.pg_class pgc ON pgc.relname = t.table_name 

592 JOIN pg_catalog.pg_namespace pgn ON pgn.oid = pgc.relnamespace 

593 WHERE t.table_schema = current_schema() 

594 AND t.table_type in ('BASE TABLE', 'VIEW') 

595 AND t.table_name NOT LIKE 'pg_%' 

596 AND t.table_name NOT LIKE 'sql_%' 

597 """ 

598 

599 if table_names is not None and len(table_names) > 0: 599 ↛ 603line 599 didn't jump to line 603 because the condition on line 599 was always true

600 table_names = [f"'{t}'" for t in table_names] 

601 query += f" AND t.table_name IN ({','.join(table_names)})" 

602 

603 result = self.native_query(query) 

604 return result 

605 

606 def meta_get_columns(self, table_names: Optional[list] = None) -> Response: 

607 """ 

608 Retrieves column metadata for the specified tables (or all tables if no list is provided). 

609 

610 Args: 

611 table_names (list): A list of table names for which to retrieve column metadata. 

612 

613 Returns: 

614 Response: A response object containing the column metadata. 

615 """ 

616 query = """ 

617 SELECT 

618 c.table_name, 

619 c.column_name, 

620 c.data_type, 

621 col_description(pgc.oid, c.ordinal_position) AS column_description, 

622 c.column_default, 

623 (c.is_nullable = 'YES') AS is_nullable 

624 FROM information_schema.columns c 

625 JOIN pg_catalog.pg_class pgc ON pgc.relname = c.table_name 

626 JOIN pg_catalog.pg_namespace pgn ON pgn.oid = pgc.relnamespace 

627 WHERE c.table_schema = current_schema() 

628 AND pgc.relkind = 'r' -- Only consider regular tables (avoids indexes, sequences, etc.) 

629 AND c.table_name NOT LIKE 'pg_%' 

630 AND c.table_name NOT LIKE 'sql_%' 

631 AND pgn.nspname = c.table_schema 

632 """ 

633 

634 if table_names is not None and len(table_names) > 0: 634 ↛ 638line 634 didn't jump to line 638 because the condition on line 634 was always true

635 table_names = [f"'{t}'" for t in table_names] 

636 query += f" AND c.table_name IN ({','.join(table_names)})" 

637 

638 result = self.native_query(query) 

639 return result 

640 

641 def meta_get_column_statistics(self, table_names: Optional[list] = None) -> Response: 

642 """ 

643 Retrieves column statistics (e.g., most common values, frequencies, null percentage, and distinct value count) 

644 for the specified tables or all tables if no list is provided. 

645 

646 Args: 

647 table_names (list): A list of table names for which to retrieve column statistics. 

648 

649 Returns: 

650 Response: A response object containing the column statistics. 

651 """ 

652 table_filter = "" 

653 if table_names is not None and len(table_names) > 0: 653 ↛ 657line 653 didn't jump to line 657 because the condition on line 653 was always true

654 quoted_names = [f"'{t}'" for t in table_names] 

655 table_filter = f" AND ps.tablename IN ({','.join(quoted_names)})" 

656 

657 query = ( 

658 """ 

659 SELECT 

660 ps.tablename AS TABLE_NAME, 

661 ps.attname AS COLUMN_NAME, 

662 ROUND(ps.null_frac::numeric * 100, 2) AS NULL_PERCENTAGE, 

663 CASE  

664 WHEN ps.n_distinct < 0 THEN NULL 

665 ELSE ps.n_distinct::bigint 

666 END AS DISTINCT_VALUES_COUNT, 

667 ps.most_common_vals AS MOST_COMMON_VALUES, 

668 ps.most_common_freqs AS MOST_COMMON_FREQUENCIES, 

669 ps.histogram_bounds 

670 FROM pg_stats ps 

671 WHERE ps.schemaname = current_schema() 

672 AND ps.tablename NOT LIKE 'pg_%' 

673 AND ps.tablename NOT LIKE 'sql_%' 

674 """ 

675 + table_filter 

676 + """ 

677 ORDER BY ps.tablename, ps.attname 

678 """ 

679 ) 

680 

681 result = self.native_query(query) 

682 

683 if result.type == RESPONSE_TYPE.TABLE and result.data_frame is not None: 683 ↛ 709line 683 didn't jump to line 709 because the condition on line 683 was always true

684 df = result.data_frame 

685 

686 # Extract min/max from histogram bounds 

687 def extract_min_max(histogram_str): 

688 if histogram_str and str(histogram_str) != "nan": 688 ↛ 695line 688 didn't jump to line 695 because the condition on line 688 was always true

689 clean = str(histogram_str).strip("{}") 

690 if clean: 690 ↛ 695line 690 didn't jump to line 695 because the condition on line 690 was always true

691 values = clean.split(",") 

692 min_val = values[0].strip(" \"'") if values else None 

693 max_val = values[-1].strip(" \"'") if values else None 

694 return min_val, max_val 

695 return None, None 

696 

697 min_max_values = df["histogram_bounds"].apply(extract_min_max) 

698 df["MINIMUM_VALUE"] = min_max_values.apply(lambda x: x[0]) 

699 df["MAXIMUM_VALUE"] = min_max_values.apply(lambda x: x[1]) 

700 

701 # Convert most_common_values and most_common_freqs to arrays. 

702 df["MOST_COMMON_VALUES"] = df["most_common_values"].apply( 

703 lambda x: x.strip("{}").split(",") if isinstance(x, str) else [] 

704 ) 

705 df["MOST_COMMON_FREQUENCIES"] = df["most_common_frequencies"].apply( 

706 lambda x: x.strip("{}").split(",") if isinstance(x, str) else [] 

707 ) 

708 

709 result.data_frame = df.drop(columns=["histogram_bounds", "most_common_values", "most_common_frequencies"]) 

710 

711 return result 

712 

713 def meta_get_primary_keys(self, table_names: Optional[list] = None) -> Response: 

714 """ 

715 Retrieves primary key information for the specified tables (or all tables if no list is provided). 

716 

717 Args: 

718 table_names (list): A list of table names for which to retrieve primary key information. 

719 

720 Returns: 

721 Response: A response object containing the primary key information. 

722 """ 

723 query = """ 

724 SELECT 

725 tc.table_name, 

726 kcu.column_name, 

727 kcu.ordinal_position, 

728 tc.constraint_name 

729 FROM 

730 information_schema.table_constraints AS tc 

731 JOIN 

732 information_schema.key_column_usage AS kcu 

733 ON 

734 tc.constraint_name = kcu.constraint_name 

735 WHERE 

736 tc.constraint_type = 'PRIMARY KEY' 

737 AND tc.table_schema = current_schema() 

738 """ 

739 

740 if table_names is not None and len(table_names) > 0: 740 ↛ 744line 740 didn't jump to line 744 because the condition on line 740 was always true

741 table_names = [f"'{t}'" for t in table_names] 

742 query += f" AND tc.table_name IN ({','.join(table_names)})" 

743 

744 result = self.native_query(query) 

745 return result 

746 

747 def meta_get_foreign_keys(self, table_names: Optional[list] = None) -> Response: 

748 """ 

749 Retrieves foreign key information for the specified tables (or all tables if no list is provided). 

750 

751 Args: 

752 table_names (list): A list of table names for which to retrieve foreign key information. 

753 

754 Returns: 

755 Response: A response object containing the foreign key information. 

756 """ 

757 query = """ 

758 SELECT 

759 ccu.table_name AS parent_table_name, 

760 ccu.column_name AS parent_column_name, 

761 tc.table_name AS child_table_name, 

762 kcu.column_name AS child_column_name, 

763 tc.constraint_name 

764 FROM 

765 information_schema.table_constraints AS tc 

766 JOIN 

767 information_schema.key_column_usage AS kcu 

768 ON 

769 tc.constraint_name = kcu.constraint_name 

770 JOIN 

771 information_schema.constraint_column_usage AS ccu 

772 ON 

773 ccu.constraint_name = tc.constraint_name 

774 WHERE 

775 tc.constraint_type = 'FOREIGN KEY' 

776 AND tc.table_schema = current_schema() 

777 """ 

778 

779 if table_names is not None and len(table_names) > 0: 779 ↛ 783line 779 didn't jump to line 783 because the condition on line 779 was always true

780 table_names = [f"'{t}'" for t in table_names] 

781 query += f" AND tc.table_name IN ({','.join(table_names)})" 

782 

783 result = self.native_query(query) 

784 return result