Coverage for mindsdb / integrations / handlers / vertica_handler / vertica_handler.py: 0%
78 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4import vertica_python as vp
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb_sql_parser.ast.base import ASTNode
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.base import DatabaseHandler
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE
16)
18# from sqlalchemy_vertica.dialect_pyodbc import VerticaDialect
19from sqla_vertica_python.vertica_python import VerticaDialect
21logger = log.getLogger(__name__)
24class VerticaHandler(DatabaseHandler):
25 """
26 This handler handles connection and execution of the Vertica statements.
27 """
29 name = 'vertica'
31 def __init__(self, name, connection_data: Optional[dict], **kwargs):
32 super().__init__(name)
34 self.parser = parse_sql
35 self.dialect = 'vertica'
36 self.kwargs = kwargs
37 self.connection_data = connection_data
38 self.schema_name = connection_data['schema_name'] if 'schema_name' in connection_data else "public"
40 self.connection = None
41 self.is_connected = False
43 def connect(self):
44 if self.is_connected is True:
45 return self.connection
47 config = {
48 'host': self.connection_data['host'],
49 'port': self.connection_data['port'],
50 'user': self.connection_data['user'],
51 'password': self.connection_data['password'],
52 'database': self.connection_data['database']
53 }
55 connection = vp.connect(**config)
56 self.is_connected = True
57 self.connection = connection
58 return self.connection
60 def disconnect(self):
61 if self.is_connected is False:
62 return
63 self.connection.close()
64 self.is_connected = False
65 return
67 def check_connection(self) -> StatusResponse:
69 result = StatusResponse(False)
70 need_to_close = self.is_connected is False
72 try:
73 connection = self.connect()
74 result.success = connection.opened()
75 except Exception as e:
76 logger.error(f'Error connecting to Vertica {self.connection_data["database"]}, {e}!')
77 result.error_message = str(e)
79 if result.success is True and need_to_close:
80 self.disconnect()
81 if result.success is False and self.is_connected is True:
82 self.is_connected = False
84 return result
86 def native_query(self, query: str) -> Response:
87 """
88 Receive SQL query and runs it
89 :param query: The SQL query to run in VERTICA
90 :return: returns the records from the current recordset
91 """
93 need_to_close = self.is_connected is False
95 connection = self.connect()
96 with connection.cursor() as cur:
97 try:
98 e = cur.execute(query)
99 result = e.fetchall()
100 if e.rowcount != -1:
102 response = Response(
103 RESPONSE_TYPE.TABLE,
104 pd.DataFrame(
105 result,
106 columns=[x.name for x in cur.description]
107 )
108 )
109 else:
110 response = Response(RESPONSE_TYPE.OK)
111 connection.commit()
112 except Exception as e:
113 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!')
114 response = Response(
115 RESPONSE_TYPE.ERROR,
116 error_message=str(e)
117 )
118 connection.rollback()
120 if need_to_close is True:
121 self.disconnect()
123 return response
125 def query(self, query: ASTNode) -> Response:
126 """
127 Retrieve the data from the SQL statement.
128 """
129 renderer = SqlalchemyRender(VerticaDialect)
130 query_str = renderer.get_string(query, with_failback=True)
131 return self.native_query(query_str)
133 def get_tables(self) -> Response:
134 """
135 Get a list with all of the tabels in VERTICA
136 """
137 q = f'''SELECT
138 TABLE_NAME,
139 TABLE_SCHEMA
140 from v_catalog.tables
141 WHERE table_schema='{self.schema_name}'
142 order by
143 table_name;'''
145 return self.native_query(q)
147 def get_columns(self, table_name) -> Response:
148 """
149 Show details about the table
150 """
151 q = f'''SELECT
152 column_name ,
153 data_type
154 FROM v_catalog.columns
155 WHERE table_name='{table_name}';'''
157 return self.native_query(q)