Coverage for mindsdb / integrations / handlers / vertica_handler / vertica_handler.py: 0%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4import vertica_python as vp 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb_sql_parser.ast.base import ASTNode 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.base import DatabaseHandler 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE 

16) 

17 

18# from sqlalchemy_vertica.dialect_pyodbc import VerticaDialect 

19from sqla_vertica_python.vertica_python import VerticaDialect 

20 

21logger = log.getLogger(__name__) 

22 

23 

24class VerticaHandler(DatabaseHandler): 

25 """ 

26 This handler handles connection and execution of the Vertica statements. 

27 """ 

28 

29 name = 'vertica' 

30 

31 def __init__(self, name, connection_data: Optional[dict], **kwargs): 

32 super().__init__(name) 

33 

34 self.parser = parse_sql 

35 self.dialect = 'vertica' 

36 self.kwargs = kwargs 

37 self.connection_data = connection_data 

38 self.schema_name = connection_data['schema_name'] if 'schema_name' in connection_data else "public" 

39 

40 self.connection = None 

41 self.is_connected = False 

42 

43 def connect(self): 

44 if self.is_connected is True: 

45 return self.connection 

46 

47 config = { 

48 'host': self.connection_data['host'], 

49 'port': self.connection_data['port'], 

50 'user': self.connection_data['user'], 

51 'password': self.connection_data['password'], 

52 'database': self.connection_data['database'] 

53 } 

54 

55 connection = vp.connect(**config) 

56 self.is_connected = True 

57 self.connection = connection 

58 return self.connection 

59 

60 def disconnect(self): 

61 if self.is_connected is False: 

62 return 

63 self.connection.close() 

64 self.is_connected = False 

65 return 

66 

67 def check_connection(self) -> StatusResponse: 

68 

69 result = StatusResponse(False) 

70 need_to_close = self.is_connected is False 

71 

72 try: 

73 connection = self.connect() 

74 result.success = connection.opened() 

75 except Exception as e: 

76 logger.error(f'Error connecting to Vertica {self.connection_data["database"]}, {e}!') 

77 result.error_message = str(e) 

78 

79 if result.success is True and need_to_close: 

80 self.disconnect() 

81 if result.success is False and self.is_connected is True: 

82 self.is_connected = False 

83 

84 return result 

85 

86 def native_query(self, query: str) -> Response: 

87 """ 

88 Receive SQL query and runs it 

89 :param query: The SQL query to run in VERTICA 

90 :return: returns the records from the current recordset 

91 """ 

92 

93 need_to_close = self.is_connected is False 

94 

95 connection = self.connect() 

96 with connection.cursor() as cur: 

97 try: 

98 e = cur.execute(query) 

99 result = e.fetchall() 

100 if e.rowcount != -1: 

101 

102 response = Response( 

103 RESPONSE_TYPE.TABLE, 

104 pd.DataFrame( 

105 result, 

106 columns=[x.name for x in cur.description] 

107 ) 

108 ) 

109 else: 

110 response = Response(RESPONSE_TYPE.OK) 

111 connection.commit() 

112 except Exception as e: 

113 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!') 

114 response = Response( 

115 RESPONSE_TYPE.ERROR, 

116 error_message=str(e) 

117 ) 

118 connection.rollback() 

119 

120 if need_to_close is True: 

121 self.disconnect() 

122 

123 return response 

124 

125 def query(self, query: ASTNode) -> Response: 

126 """ 

127 Retrieve the data from the SQL statement. 

128 """ 

129 renderer = SqlalchemyRender(VerticaDialect) 

130 query_str = renderer.get_string(query, with_failback=True) 

131 return self.native_query(query_str) 

132 

133 def get_tables(self) -> Response: 

134 """ 

135 Get a list with all of the tabels in VERTICA 

136 """ 

137 q = f'''SELECT 

138 TABLE_NAME, 

139 TABLE_SCHEMA 

140 from v_catalog.tables 

141 WHERE table_schema='{self.schema_name}' 

142 order by 

143 table_name;''' 

144 

145 return self.native_query(q) 

146 

147 def get_columns(self, table_name) -> Response: 

148 """ 

149 Show details about the table 

150 """ 

151 q = f'''SELECT 

152 column_name , 

153 data_type 

154 FROM v_catalog.columns 

155 WHERE table_name='{table_name}';''' 

156 

157 return self.native_query(q)