Coverage for mindsdb / integrations / handlers / postgres_handler / postgres_handler.py: 90%
326 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2import json
3import logging
4from typing import Optional, Any
6import pandas as pd
7from pandas import DataFrame
8import psycopg
9from psycopg import Column as PGColumn, Cursor
10from psycopg.postgres import TypeInfo, types as pg_types
11from psycopg.pq import ExecStatus
13from mindsdb_sql_parser import parse_sql
14from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
15from mindsdb_sql_parser.ast.base import ASTNode
17from mindsdb.integrations.libs.base import MetaDatabaseHandler
18from mindsdb.utilities import log
19from mindsdb.integrations.libs.response import (
20 HandlerStatusResponse as StatusResponse,
21 HandlerResponse as Response,
22 RESPONSE_TYPE,
23)
24import mindsdb.utilities.profiler as profiler
25from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE
27logger = log.getLogger(__name__)
29SUBSCRIBE_SLEEP_INTERVAL = 1
32def _map_type(internal_type_name: str | None) -> MYSQL_DATA_TYPE:
33 """Map Postgres types to MySQL types.
35 Args:
36 internal_type_name (str): The name of the Postgres type to map.
38 Returns:
39 MYSQL_DATA_TYPE: The MySQL type that corresponds to the Postgres type.
40 """
41 fallback_type = MYSQL_DATA_TYPE.VARCHAR
43 if internal_type_name is None:
44 return fallback_type
46 internal_type_name = internal_type_name.lower()
47 types_map = {
48 ("smallint", "smallserial"): MYSQL_DATA_TYPE.SMALLINT,
49 ("integer", "int", "serial"): MYSQL_DATA_TYPE.INT,
50 ("bigint", "bigserial"): MYSQL_DATA_TYPE.BIGINT,
51 ("real", "float"): MYSQL_DATA_TYPE.FLOAT,
52 ("numeric", "decimal"): MYSQL_DATA_TYPE.DECIMAL,
53 ("double precision",): MYSQL_DATA_TYPE.DOUBLE,
54 ("character varying", "varchar"): MYSQL_DATA_TYPE.VARCHAR,
55 # NOTE: if return chars-types as mysql's CHAR, then response will be padded with spaces, so return as TEXT
56 ("money", "character", "char", "bpchar", "bpchar", "text"): MYSQL_DATA_TYPE.TEXT,
57 ("timestamp", "timestamp without time zone", "timestamp with time zone"): MYSQL_DATA_TYPE.DATETIME,
58 ("date",): MYSQL_DATA_TYPE.DATE,
59 ("time", "time without time zone", "time with time zone"): MYSQL_DATA_TYPE.TIME,
60 ("boolean",): MYSQL_DATA_TYPE.BOOL,
61 ("bytea",): MYSQL_DATA_TYPE.BINARY,
62 ("json", "jsonb"): MYSQL_DATA_TYPE.JSON,
63 }
65 for db_types_list, mysql_data_type in types_map.items():
66 if internal_type_name in db_types_list:
67 return mysql_data_type
69 logger.debug(f"Postgres handler type mapping: unknown type: {internal_type_name}, use VARCHAR as fallback.")
70 return fallback_type
73def _make_table_response(result: list[tuple[Any]], cursor: Cursor) -> Response:
74 """Build response from result and cursor.
76 Args:
77 result (list[tuple[Any]]): result of the query.
78 cursor (psycopg.Cursor): cursor object.
80 Returns:
81 Response: response object.
82 """
83 description: list[PGColumn] = cursor.description
84 mysql_types: list[MYSQL_DATA_TYPE] = []
85 for column in description:
86 if column.type_display == "vector":
87 # 'vector' is type of pgvector extension, added here as text to not import pgvector
88 # NOTE: data returned as numpy array
89 mysql_types.append(MYSQL_DATA_TYPE.VECTOR)
90 continue
91 pg_type_info: TypeInfo = pg_types.get(column.type_code)
92 if pg_type_info is None: 92 ↛ 96line 92 didn't jump to line 96 because the condition on line 92 was never true
93 # postgres may return 'polymorphic type', which are not present in the pg_types
94 # list of 'polymorphic type' can be obtained:
95 # SELECT oid, typname, typcategory FROM pg_type WHERE typcategory = 'P' ORDER BY oid;
96 if column.type_code in (2277, 5078):
97 # anyarray, anycompatiblearray
98 regtype = "json"
99 else:
100 logger.warning(f"Postgres handler: unknown type: {column.type_code}")
101 mysql_types.append(MYSQL_DATA_TYPE.TEXT)
102 continue
103 elif pg_type_info.array_oid == column.type_code:
104 # it is any array, handle is as json
105 regtype: str = "json"
106 else:
107 regtype: str = pg_type_info.regtype if pg_type_info is not None else None
108 mysql_type = _map_type(regtype)
109 mysql_types.append(mysql_type)
111 # region cast int and bool to nullable types
112 serieses = []
113 for i, mysql_type in enumerate(mysql_types):
114 expected_dtype = None
115 if mysql_type in (
116 MYSQL_DATA_TYPE.SMALLINT,
117 MYSQL_DATA_TYPE.INT,
118 MYSQL_DATA_TYPE.MEDIUMINT,
119 MYSQL_DATA_TYPE.BIGINT,
120 MYSQL_DATA_TYPE.TINYINT,
121 ):
122 expected_dtype = "Int64"
123 elif mysql_type in (MYSQL_DATA_TYPE.BOOL, MYSQL_DATA_TYPE.BOOLEAN):
124 expected_dtype = "boolean"
125 serieses.append(pd.Series([row[i] for row in result], dtype=expected_dtype, name=description[i].name))
126 df = pd.concat(serieses, axis=1, copy=False)
127 # endregion
129 return Response(RESPONSE_TYPE.TABLE, data_frame=df, affected_rows=cursor.rowcount, mysql_types=mysql_types)
132class PostgresHandler(MetaDatabaseHandler):
133 """
134 This handler handles connection and execution of the PostgreSQL statements.
135 """
137 name = "postgres"
139 @profiler.profile("init_pg_handler")
140 def __init__(self, name=None, **kwargs):
141 super().__init__(name)
142 self.parser = parse_sql
143 self.connection_args = kwargs.get("connection_data")
144 self.dialect = "postgresql"
145 self.database = self.connection_args.get("database")
146 self.renderer = SqlalchemyRender("postgres")
148 self.connection = None
149 self.is_connected = False
150 self.cache_thread_safe = True
152 def __del__(self):
153 if self.is_connected:
154 self.disconnect()
156 def _make_connection_args(self):
157 config = {
158 "host": self.connection_args.get("host"),
159 "port": self.connection_args.get("port"),
160 "user": self.connection_args.get("user"),
161 "password": self.connection_args.get("password"),
162 "dbname": self.connection_args.get("database"),
163 }
165 # https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
166 connection_parameters = self.connection_args.get("connection_parameters")
167 if isinstance(connection_parameters, dict) is False:
168 connection_parameters = {}
169 if "connect_timeout" not in connection_parameters: 169 ↛ 171line 169 didn't jump to line 171 because the condition on line 169 was always true
170 connection_parameters["connect_timeout"] = 10
171 config.update(connection_parameters)
173 if self.connection_args.get("sslmode"):
174 config["sslmode"] = self.connection_args.get("sslmode")
176 if self.connection_args.get("autocommit"):
177 config["autocommit"] = self.connection_args.get("autocommit")
179 # If schema is not provided set public as default one
180 if self.connection_args.get("schema"): 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true
181 config["options"] = f"-c search_path={self.connection_args.get('schema')},public"
182 return config
184 @profiler.profile()
185 def connect(self):
186 """
187 Establishes a connection to a PostgreSQL database.
189 Raises:
190 psycopg.Error: If an error occurs while connecting to the PostgreSQL database.
192 Returns:
193 psycopg.Connection: A connection object to the PostgreSQL database.
194 """
195 if self.is_connected:
196 return self.connection
198 config = self._make_connection_args()
199 try:
200 self.connection = psycopg.connect(**config)
201 self.is_connected = True
202 return self.connection
203 except psycopg.Error as e:
204 logger.error(f"Error connecting to PostgreSQL {self.database}, {e}!")
205 self.is_connected = False
206 raise
208 def disconnect(self):
209 """
210 Closes the connection to the PostgreSQL database if it's currently open.
211 """
212 if not self.is_connected:
213 return
214 self.connection.close()
215 self.is_connected = False
217 def check_connection(self) -> StatusResponse:
218 """
219 Checks the status of the connection to the PostgreSQL database.
221 Returns:
222 StatusResponse: An object containing the success status and an error message if an error occurs.
223 """
224 response = StatusResponse(False)
225 need_to_close = not self.is_connected
227 try:
228 connection = self.connect()
229 with connection.cursor() as cur:
230 # Execute a simple query to test the connection
231 cur.execute("select 1;")
232 response.success = True
233 except psycopg.Error as e:
234 logger.error(f"Error connecting to PostgreSQL {self.database}, {e}!")
235 response.error_message = str(e)
237 if response.success and need_to_close:
238 self.disconnect()
239 elif not response.success and self.is_connected: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 self.is_connected = False
242 return response
244 def _cast_dtypes(self, df: DataFrame, description: list) -> DataFrame:
245 """
246 Cast df dtypes basing on postgres types
247 Note:
248 Date types casting is not provided because of there is no issues (so far).
249 By default pandas will cast postgres date types to:
250 - date -> object
251 - time -> object
252 - timetz -> object
253 - timestamp -> datetime64[ns]
254 - timestamptz -> datetime64[ns, {tz}]
256 Args:
257 df (DataFrame)
258 description (list): psycopg cursor description
259 """
260 types_map = {
261 "int2": "int16",
262 "int4": "int32",
263 "int8": "int64",
264 "numeric": "float64",
265 "float4": "float32",
266 "float8": "float64",
267 }
268 columns = df.columns
269 df.columns = list(range(len(columns)))
270 for column_index, column_name in enumerate(df.columns):
271 col = df[column_name]
272 if str(col.dtype) == "object":
273 pg_type_info: TypeInfo = pg_types.get(description[column_index].type_code) # type_code is int!?
274 if pg_type_info is not None and pg_type_info.name in types_map:
275 col = col.fillna(0) # TODO rework
276 try:
277 df[column_name] = col.astype(types_map[pg_type_info.name])
278 except ValueError as e:
279 logger.error(f"Error casting column {col.name} to {types_map[pg_type_info.name]}: {e}")
280 df.columns = columns
282 @profiler.profile()
283 def native_query(self, query: str, params=None, **kwargs) -> Response:
284 """
285 Executes a SQL query on the PostgreSQL database and returns the result.
287 Args:
288 query (str): The SQL query to be executed.
290 Returns:
291 Response: A response object containing the result of the query or an error message.
292 """
293 need_to_close = not self.is_connected
295 connection = self.connect()
296 with connection.cursor() as cur:
297 try:
298 if params is not None:
299 cur.executemany(query, params)
300 else:
301 cur.execute(query)
302 if cur.pgresult is None or ExecStatus(cur.pgresult.status) == ExecStatus.COMMAND_OK:
303 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount)
304 else:
305 result = cur.fetchall()
306 response = _make_table_response(result, cur)
307 connection.commit()
308 except (psycopg.ProgrammingError, psycopg.DataError) as e:
309 # These is 'expected' exceptions, they should not be treated as mindsdb's errors
310 # ProgrammingError: table not found or already exists, syntax error, etc
311 # DataError: division by zero, numeric value out of range, etc.
312 # https://www.psycopg.org/psycopg3/docs/api/errors.html
313 log_message = "Database query failed with error, likely due to invalid SQL query"
314 if logger.isEnabledFor(logging.DEBUG): 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 log_message += f". Executed query:\n{query}"
316 logger.info(log_message)
317 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e), is_expected_error=True)
318 connection.rollback()
319 except Exception as e:
320 logger.error(f"Error running query:\n{query}\non {self.database}, {e}")
321 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e))
322 connection.rollback()
324 if need_to_close:
325 self.disconnect()
327 return response
329 def query_stream(self, query: ASTNode, fetch_size: int = 1000):
330 """
331 Executes a SQL query and stream results outside by batches
333 :param query: An ASTNode representing the SQL query to be executed.
334 :param fetch_size: size of the batch
335 :return: generator with query results
336 """
337 query_str, params = self.renderer.get_exec_params(query, with_failback=True)
339 need_to_close = not self.is_connected
341 connection = self.connect()
342 with connection.cursor() as cur:
343 try:
344 if params is not None: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 cur.executemany(query_str, params)
346 else:
347 cur.execute(query_str)
349 if cur.pgresult is not None and ExecStatus(cur.pgresult.status) != ExecStatus.COMMAND_OK: 349 ↛ 357line 349 didn't jump to line 357 because the condition on line 349 was always true
350 while True:
351 result = cur.fetchmany(fetch_size)
352 if not result:
353 break
354 df = DataFrame(result, columns=[x.name for x in cur.description])
355 self._cast_dtypes(df, cur.description)
356 yield df
357 connection.commit()
358 finally:
359 connection.rollback()
361 if need_to_close: 361 ↛ exitline 361 didn't return from function 'query_stream' because the condition on line 361 was always true
362 self.disconnect()
364 def insert(self, table_name: str, df: pd.DataFrame) -> Response:
365 need_to_close = not self.is_connected
367 connection = self.connect()
369 columns = df.columns
371 resp = self.get_columns(table_name)
373 # copy requires precise cases of names: get current column names from table and adapt input dataframe columns
374 if resp.data_frame is not None and not resp.data_frame.empty: 374 ↛ 380line 374 didn't jump to line 380 because the condition on line 374 was always true
375 db_columns = {c.lower(): c for c in resp.data_frame["COLUMN_NAME"]}
377 # try to get case of existing column
378 columns = [db_columns.get(c.lower(), c) for c in columns]
380 columns = [f'"{c}"' for c in columns]
381 rowcount = None
383 with connection.cursor() as cur:
384 try:
385 with cur.copy(f'copy "{table_name}" ({",".join(columns)}) from STDIN WITH CSV') as copy:
386 df.to_csv(copy, index=False, header=False)
388 connection.commit()
389 except Exception as e:
390 logger.error(f"Error running insert to {table_name} on {self.database}, {e}!")
391 connection.rollback()
392 raise e
393 rowcount = cur.rowcount
395 if need_to_close: 395 ↛ 398line 395 didn't jump to line 398 because the condition on line 395 was always true
396 self.disconnect()
398 return Response(RESPONSE_TYPE.OK, affected_rows=rowcount)
400 @profiler.profile()
401 def query(self, query: ASTNode) -> Response:
402 """
403 Executes a SQL query represented by an ASTNode and retrieves the data.
405 Args:
406 query (ASTNode): An ASTNode representing the SQL query to be executed.
408 Returns:
409 Response: The response from the `native_query` method, containing the result of the SQL query execution.
410 """
411 query_str, params = self.renderer.get_exec_params(query, with_failback=True)
412 logger.debug(f"Executing SQL query: {query_str}")
413 return self.native_query(query_str, params)
415 def get_tables(self, all: bool = False) -> Response:
416 """
417 Retrieves a list of all non-system tables and views in the current schema of the PostgreSQL database.
419 Returns:
420 Response: A response object containing the list of tables and views, formatted as per the `Response` class.
421 """
422 all_filter = "and table_schema = current_schema()"
423 if all is True:
424 all_filter = ""
425 query = f"""
426 SELECT
427 table_schema,
428 table_name,
429 table_type
430 FROM
431 information_schema.tables
432 WHERE
433 table_schema NOT IN ('information_schema', 'pg_catalog')
434 and table_type in ('BASE TABLE', 'VIEW')
435 {all_filter}
436 """
437 return self.native_query(query)
439 def get_columns(self, table_name: str, schema_name: Optional[str] = None) -> Response:
440 """
441 Retrieves column details for a specified table in the PostgreSQL database.
443 Args:
444 table_name (str): The name of the table for which to retrieve column information.
445 schema_name (str): The name of the schema in which the table is located.
447 Returns:
448 Response: A response object containing the column details, formatted as per the `Response` class.
450 Raises:
451 ValueError: If the 'table_name' is not a valid string.
452 """
454 if not table_name or not isinstance(table_name, str): 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true
455 raise ValueError("Invalid table name provided.")
456 if isinstance(schema_name, str):
457 schema_name = f"'{schema_name}'"
458 else:
459 schema_name = "current_schema()"
460 query = f"""
461 SELECT
462 COLUMN_NAME,
463 DATA_TYPE,
464 ORDINAL_POSITION,
465 COLUMN_DEFAULT,
466 IS_NULLABLE,
467 CHARACTER_MAXIMUM_LENGTH,
468 CHARACTER_OCTET_LENGTH,
469 NUMERIC_PRECISION,
470 NUMERIC_SCALE,
471 DATETIME_PRECISION,
472 CHARACTER_SET_NAME,
473 COLLATION_NAME
474 FROM
475 information_schema.columns
476 WHERE
477 table_name = '{table_name}'
478 AND
479 table_schema = {schema_name}
480 """
481 # If it is used by pgvector handler - `native_query` method of pgvector handler will be used
482 # in that case if shared pgvector db is used - `native_query` will be skipped (return empty result)
483 # `no_restrict` flag allows to execute native query, and it will call `native_query` of postgres handler
484 result = self.native_query(query, no_restrict=True)
485 result.to_columns_table_response(map_type_fn=_map_type)
486 return result
488 def subscribe(self, stop_event, callback, table_name, columns=None, **kwargs):
489 config = self._make_connection_args()
490 config["autocommit"] = True
492 conn = psycopg.connect(**config)
494 # create db trigger
495 trigger_name = f"mdb_notify_{table_name}"
497 before, after = "", ""
499 if columns: 499 ↛ 514line 499 didn't jump to line 514 because the condition on line 499 was always true
500 # check column exist
501 conn.execute(f"select {','.join(columns)} from {table_name} limit 0")
503 columns = set(columns)
504 trigger_name += "_" + "_".join(columns)
506 news, olds = [], []
507 for column in columns:
508 news.append(f"NEW.{column}")
509 olds.append(f"OLD.{column}")
511 before = f"IF ({', '.join(news)}) IS DISTINCT FROM ({', '.join(olds)}) then\n"
512 after = "\nEND IF;"
513 else:
514 columns = set()
516 func_code = f"""
517 CREATE OR REPLACE FUNCTION {trigger_name}()
518 RETURNS trigger AS $$
519 DECLARE
520 BEGIN
521 {before}
522 PERFORM pg_notify( '{trigger_name}', row_to_json(NEW)::text);
523 {after}
524 RETURN NEW;
525 END;
526 $$ LANGUAGE plpgsql;
527 """
528 conn.execute(func_code)
530 # for after update - new and old have the same values
531 conn.execute(f"""
532 CREATE OR REPLACE TRIGGER {trigger_name}
533 BEFORE INSERT OR UPDATE ON {table_name}
534 FOR EACH ROW
535 EXECUTE PROCEDURE {trigger_name}();
536 """)
537 conn.commit()
539 # start listen
540 conn.execute(f"LISTEN {trigger_name};")
542 def process_event(event):
543 try:
544 row = json.loads(event.payload)
545 except json.JSONDecoder:
546 return
548 # check column in input data
549 if not columns or columns.intersection(row.keys()): 549 ↛ exitline 549 didn't return from function 'process_event' because the condition on line 549 was always true
550 callback(row)
552 try:
553 conn.add_notify_handler(process_event)
555 while True:
556 if stop_event.is_set():
557 # exit trigger
558 return
560 # trigger getting updates
561 # https://www.psycopg.org/psycopg3/docs/advanced/async.html#asynchronous-notifications
562 conn.execute("SELECT 1").fetchone()
564 time.sleep(SUBSCRIBE_SLEEP_INTERVAL)
566 finally:
567 conn.execute(f"drop TRIGGER {trigger_name} on {table_name}")
568 conn.execute(f"drop FUNCTION {trigger_name}")
569 conn.commit()
571 conn.close()
573 def meta_get_tables(self, table_names: Optional[list] = None) -> Response:
574 """
575 Retrieves metadata information about the tables in the PostgreSQL database to be stored in the data catalog.
577 Args:
578 table_names (list): A list of table names for which to retrieve metadata information.
580 Returns:
581 Response: A response object containing the metadata information, formatted as per the `Response` class.
582 """
583 query = """
584 SELECT
585 t.table_name,
586 t.table_schema,
587 t.table_type,
588 obj_description(pgc.oid, 'pg_class') AS table_description,
589 pgc.reltuples AS row_count
590 FROM information_schema.tables t
591 JOIN pg_catalog.pg_class pgc ON pgc.relname = t.table_name
592 JOIN pg_catalog.pg_namespace pgn ON pgn.oid = pgc.relnamespace
593 WHERE t.table_schema = current_schema()
594 AND t.table_type in ('BASE TABLE', 'VIEW')
595 AND t.table_name NOT LIKE 'pg_%'
596 AND t.table_name NOT LIKE 'sql_%'
597 """
599 if table_names is not None and len(table_names) > 0: 599 ↛ 603line 599 didn't jump to line 603 because the condition on line 599 was always true
600 table_names = [f"'{t}'" for t in table_names]
601 query += f" AND t.table_name IN ({','.join(table_names)})"
603 result = self.native_query(query)
604 return result
606 def meta_get_columns(self, table_names: Optional[list] = None) -> Response:
607 """
608 Retrieves column metadata for the specified tables (or all tables if no list is provided).
610 Args:
611 table_names (list): A list of table names for which to retrieve column metadata.
613 Returns:
614 Response: A response object containing the column metadata.
615 """
616 query = """
617 SELECT
618 c.table_name,
619 c.column_name,
620 c.data_type,
621 col_description(pgc.oid, c.ordinal_position) AS column_description,
622 c.column_default,
623 (c.is_nullable = 'YES') AS is_nullable
624 FROM information_schema.columns c
625 JOIN pg_catalog.pg_class pgc ON pgc.relname = c.table_name
626 JOIN pg_catalog.pg_namespace pgn ON pgn.oid = pgc.relnamespace
627 WHERE c.table_schema = current_schema()
628 AND pgc.relkind = 'r' -- Only consider regular tables (avoids indexes, sequences, etc.)
629 AND c.table_name NOT LIKE 'pg_%'
630 AND c.table_name NOT LIKE 'sql_%'
631 AND pgn.nspname = c.table_schema
632 """
634 if table_names is not None and len(table_names) > 0: 634 ↛ 638line 634 didn't jump to line 638 because the condition on line 634 was always true
635 table_names = [f"'{t}'" for t in table_names]
636 query += f" AND c.table_name IN ({','.join(table_names)})"
638 result = self.native_query(query)
639 return result
641 def meta_get_column_statistics(self, table_names: Optional[list] = None) -> Response:
642 """
643 Retrieves column statistics (e.g., most common values, frequencies, null percentage, and distinct value count)
644 for the specified tables or all tables if no list is provided.
646 Args:
647 table_names (list): A list of table names for which to retrieve column statistics.
649 Returns:
650 Response: A response object containing the column statistics.
651 """
652 table_filter = ""
653 if table_names is not None and len(table_names) > 0: 653 ↛ 657line 653 didn't jump to line 657 because the condition on line 653 was always true
654 quoted_names = [f"'{t}'" for t in table_names]
655 table_filter = f" AND ps.tablename IN ({','.join(quoted_names)})"
657 query = (
658 """
659 SELECT
660 ps.tablename AS TABLE_NAME,
661 ps.attname AS COLUMN_NAME,
662 ROUND(ps.null_frac::numeric * 100, 2) AS NULL_PERCENTAGE,
663 CASE
664 WHEN ps.n_distinct < 0 THEN NULL
665 ELSE ps.n_distinct::bigint
666 END AS DISTINCT_VALUES_COUNT,
667 ps.most_common_vals AS MOST_COMMON_VALUES,
668 ps.most_common_freqs AS MOST_COMMON_FREQUENCIES,
669 ps.histogram_bounds
670 FROM pg_stats ps
671 WHERE ps.schemaname = current_schema()
672 AND ps.tablename NOT LIKE 'pg_%'
673 AND ps.tablename NOT LIKE 'sql_%'
674 """
675 + table_filter
676 + """
677 ORDER BY ps.tablename, ps.attname
678 """
679 )
681 result = self.native_query(query)
683 if result.type == RESPONSE_TYPE.TABLE and result.data_frame is not None: 683 ↛ 709line 683 didn't jump to line 709 because the condition on line 683 was always true
684 df = result.data_frame
686 # Extract min/max from histogram bounds
687 def extract_min_max(histogram_str):
688 if histogram_str and str(histogram_str) != "nan": 688 ↛ 695line 688 didn't jump to line 695 because the condition on line 688 was always true
689 clean = str(histogram_str).strip("{}")
690 if clean: 690 ↛ 695line 690 didn't jump to line 695 because the condition on line 690 was always true
691 values = clean.split(",")
692 min_val = values[0].strip(" \"'") if values else None
693 max_val = values[-1].strip(" \"'") if values else None
694 return min_val, max_val
695 return None, None
697 min_max_values = df["histogram_bounds"].apply(extract_min_max)
698 df["MINIMUM_VALUE"] = min_max_values.apply(lambda x: x[0])
699 df["MAXIMUM_VALUE"] = min_max_values.apply(lambda x: x[1])
701 # Convert most_common_values and most_common_freqs to arrays.
702 df["MOST_COMMON_VALUES"] = df["most_common_values"].apply(
703 lambda x: x.strip("{}").split(",") if isinstance(x, str) else []
704 )
705 df["MOST_COMMON_FREQUENCIES"] = df["most_common_frequencies"].apply(
706 lambda x: x.strip("{}").split(",") if isinstance(x, str) else []
707 )
709 result.data_frame = df.drop(columns=["histogram_bounds", "most_common_values", "most_common_frequencies"])
711 return result
713 def meta_get_primary_keys(self, table_names: Optional[list] = None) -> Response:
714 """
715 Retrieves primary key information for the specified tables (or all tables if no list is provided).
717 Args:
718 table_names (list): A list of table names for which to retrieve primary key information.
720 Returns:
721 Response: A response object containing the primary key information.
722 """
723 query = """
724 SELECT
725 tc.table_name,
726 kcu.column_name,
727 kcu.ordinal_position,
728 tc.constraint_name
729 FROM
730 information_schema.table_constraints AS tc
731 JOIN
732 information_schema.key_column_usage AS kcu
733 ON
734 tc.constraint_name = kcu.constraint_name
735 WHERE
736 tc.constraint_type = 'PRIMARY KEY'
737 AND tc.table_schema = current_schema()
738 """
740 if table_names is not None and len(table_names) > 0: 740 ↛ 744line 740 didn't jump to line 744 because the condition on line 740 was always true
741 table_names = [f"'{t}'" for t in table_names]
742 query += f" AND tc.table_name IN ({','.join(table_names)})"
744 result = self.native_query(query)
745 return result
747 def meta_get_foreign_keys(self, table_names: Optional[list] = None) -> Response:
748 """
749 Retrieves foreign key information for the specified tables (or all tables if no list is provided).
751 Args:
752 table_names (list): A list of table names for which to retrieve foreign key information.
754 Returns:
755 Response: A response object containing the foreign key information.
756 """
757 query = """
758 SELECT
759 ccu.table_name AS parent_table_name,
760 ccu.column_name AS parent_column_name,
761 tc.table_name AS child_table_name,
762 kcu.column_name AS child_column_name,
763 tc.constraint_name
764 FROM
765 information_schema.table_constraints AS tc
766 JOIN
767 information_schema.key_column_usage AS kcu
768 ON
769 tc.constraint_name = kcu.constraint_name
770 JOIN
771 information_schema.constraint_column_usage AS ccu
772 ON
773 ccu.constraint_name = tc.constraint_name
774 WHERE
775 tc.constraint_type = 'FOREIGN KEY'
776 AND tc.table_schema = current_schema()
777 """
779 if table_names is not None and len(table_names) > 0: 779 ↛ 783line 779 didn't jump to line 783 because the condition on line 779 was always true
780 table_names = [f"'{t}'" for t in table_names]
781 query += f" AND tc.table_name IN ({','.join(table_names)})"
783 result = self.native_query(query)
784 return result