Coverage for mindsdb / integrations / handlers / maxdb_handler / maxdb_handler.py: 0%
93 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3from mindsdb_sql_parser import parse_sql
4from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
5from mindsdb_sql_parser.ast.base import ASTNode
6from mindsdb.integrations.libs.base import DatabaseHandler
8from mindsdb.utilities import log
9from mindsdb.integrations.libs.response import (
10 HandlerStatusResponse as StatusResponse,
11 HandlerResponse as Response,
12 RESPONSE_TYPE,
13 HandlerResponse,
14)
15import pandas as pd
16import jaydebeapi as jd
18logger = log.getLogger(__name__)
21class MaxDBHandler(DatabaseHandler):
22 """
23 This handler handles connection and execution of the SAP MaxDB statements.
24 """
26 name = "maxdb"
28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
29 """Initialize the handler
30 Args:
31 name (str): name of particular handler instance
32 connection_data (dict): parameters for connecting to the database
33 **kwargs: arbitrary keyword arguments.
34 """
35 super().__init__(name)
36 self.kwargs = kwargs
37 self.parser = parse_sql
38 self.connection_config = connection_data
39 self.database = connection_data["database"]
40 self.host = connection_data["host"]
41 self.port = connection_data["port"]
42 self.user = connection_data["user"]
43 self.password = connection_data["password"]
44 self.jdbc_location = connection_data["jdbc_location"]
45 self.connection = None
46 self.is_connected = False
48 def __del__(self):
49 """
50 Destructor for the SAP MaxDB class.
51 """
52 if self.is_connected is True:
53 self.disconnect()
55 def connect(self) -> StatusResponse:
56 """
57 Establishes a connection to the SAP MaxDB server.
58 Returns:
59 HandlerStatusResponse
60 """
61 if self.is_connected:
62 return self.connection
64 jdbc_url = f"jdbc:sapdb://{self.host}:{self.port}/{self.database}"
65 jdbc_class = "com.sap.dbtech.jdbc.DriverSapDB"
67 self.connection = jd.connect(jdbc_class, jdbc_url, [self.user, self.password], self.jdbc_location)
68 self.is_connected = True
69 return self.connection
71 def disconnect(self):
72 """Close any existing connections
73 Should switch self.is_connected.
74 """
75 if self.is_connected is False:
76 return
77 try:
78 self.connection.close()
79 self.is_connected = False
80 except Exception as e:
81 logger.error(f"Error while disconnecting to {self.database}, {e}")
83 return
85 def check_connection(self) -> StatusResponse:
86 """Check connection to the handler
87 Returns:
88 HandlerStatusResponse
89 """
90 response = StatusResponse(False)
91 need_to_close = self.is_connected is False
93 try:
94 self.connect()
95 response.success = True
96 except Exception as e:
97 logger.error(f"Error connecting to database {self.database}, {e}!")
98 response.error_message = str(e)
99 finally:
100 if response.success is True and need_to_close:
101 self.disconnect()
102 if response.success is False and self.is_connected is True:
103 self.is_connected = False
105 return response
107 def native_query(self, query: str) -> HandlerResponse:
108 """Receive raw query and act upon it somehow.
109 Args:
110 query (Any): query in native format (str for sql databases,
111 etc)
112 Returns:
113 HandlerResponse
114 """
115 need_to_close = self.is_connected is False
116 conn = self.connect()
117 with conn.cursor() as cur:
118 try:
119 cur.execute(query)
120 if cur.description:
121 result = cur.fetchall()
122 response = Response(
123 RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description])
124 )
125 else:
126 response = Response(RESPONSE_TYPE.OK)
127 self.connection.commit()
128 except Exception as e:
129 logger.error(f"Error running query: {query} on {self.database}!")
130 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
131 self.connection.rollback()
133 if need_to_close is True:
134 self.disconnect()
136 return response
138 def query(self, query: ASTNode) -> Response:
139 """
140 Receive query as AST (abstract syntax tree) and act upon it somehow.
141 Args:
142 query (ASTNode): sql query represented as AST. May be any kind
143 of query: SELECT, INSERT, DELETE, etc
144 Returns:
145 HandlerResponse
146 """
147 renderer = SqlalchemyRender("postgres")
148 query_str = renderer.get_string(query, with_failback=True)
149 return self.native_query(query_str)
151 def get_tables(self) -> Response:
152 """
153 Gets a list of table names in the database.
155 Returns:
156 list: A list of table names in the database.
157 """
159 query = f"SELECT TABLENAME FROM DOMAIN.TABLES WHERE TYPE = 'TABLE' AND SCHEMANAME = '{self.user}'"
160 result = self.native_query(query)
161 df = result.data_frame
162 result.data_frame = df.rename(columns={df.columns[0]: "table_name"})
163 return result
165 def get_columns(self, table_name: str) -> Response:
166 """
167 Gets a list of column names in the specified table.
169 Args:
170 table_name (str): The name of the table to get column names from.
172 Returns:
173 list: A list of column names in the specified table.
174 """
176 query = f"SELECT COLUMNNAME,DATATYPE FROM DOMAIN.COLUMNS WHERE TABLENAME ='{table_name}'"
177 result = self.native_query(query)
178 df = result.data_frame
179 result.data_frame = df.rename(columns={"name": "column_name", "type": "data_type"})
180 return self.native_query(query)