Coverage for mindsdb / integrations / handlers / maxdb_handler / maxdb_handler.py: 0%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3from mindsdb_sql_parser import parse_sql 

4from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

5from mindsdb_sql_parser.ast.base import ASTNode 

6from mindsdb.integrations.libs.base import DatabaseHandler 

7 

8from mindsdb.utilities import log 

9from mindsdb.integrations.libs.response import ( 

10 HandlerStatusResponse as StatusResponse, 

11 HandlerResponse as Response, 

12 RESPONSE_TYPE, 

13 HandlerResponse, 

14) 

15import pandas as pd 

16import jaydebeapi as jd 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class MaxDBHandler(DatabaseHandler): 

22 """ 

23 This handler handles connection and execution of the SAP MaxDB statements. 

24 """ 

25 

26 name = "maxdb" 

27 

28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

29 """Initialize the handler 

30 Args: 

31 name (str): name of particular handler instance 

32 connection_data (dict): parameters for connecting to the database 

33 **kwargs: arbitrary keyword arguments. 

34 """ 

35 super().__init__(name) 

36 self.kwargs = kwargs 

37 self.parser = parse_sql 

38 self.connection_config = connection_data 

39 self.database = connection_data["database"] 

40 self.host = connection_data["host"] 

41 self.port = connection_data["port"] 

42 self.user = connection_data["user"] 

43 self.password = connection_data["password"] 

44 self.jdbc_location = connection_data["jdbc_location"] 

45 self.connection = None 

46 self.is_connected = False 

47 

48 def __del__(self): 

49 """ 

50 Destructor for the SAP MaxDB class. 

51 """ 

52 if self.is_connected is True: 

53 self.disconnect() 

54 

55 def connect(self) -> StatusResponse: 

56 """ 

57 Establishes a connection to the SAP MaxDB server. 

58 Returns: 

59 HandlerStatusResponse 

60 """ 

61 if self.is_connected: 

62 return self.connection 

63 

64 jdbc_url = f"jdbc:sapdb://{self.host}:{self.port}/{self.database}" 

65 jdbc_class = "com.sap.dbtech.jdbc.DriverSapDB" 

66 

67 self.connection = jd.connect(jdbc_class, jdbc_url, [self.user, self.password], self.jdbc_location) 

68 self.is_connected = True 

69 return self.connection 

70 

71 def disconnect(self): 

72 """Close any existing connections 

73 Should switch self.is_connected. 

74 """ 

75 if self.is_connected is False: 

76 return 

77 try: 

78 self.connection.close() 

79 self.is_connected = False 

80 except Exception as e: 

81 logger.error(f"Error while disconnecting to {self.database}, {e}") 

82 

83 return 

84 

85 def check_connection(self) -> StatusResponse: 

86 """Check connection to the handler 

87 Returns: 

88 HandlerStatusResponse 

89 """ 

90 response = StatusResponse(False) 

91 need_to_close = self.is_connected is False 

92 

93 try: 

94 self.connect() 

95 response.success = True 

96 except Exception as e: 

97 logger.error(f"Error connecting to database {self.database}, {e}!") 

98 response.error_message = str(e) 

99 finally: 

100 if response.success is True and need_to_close: 

101 self.disconnect() 

102 if response.success is False and self.is_connected is True: 

103 self.is_connected = False 

104 

105 return response 

106 

107 def native_query(self, query: str) -> HandlerResponse: 

108 """Receive raw query and act upon it somehow. 

109 Args: 

110 query (Any): query in native format (str for sql databases, 

111 etc) 

112 Returns: 

113 HandlerResponse 

114 """ 

115 need_to_close = self.is_connected is False 

116 conn = self.connect() 

117 with conn.cursor() as cur: 

118 try: 

119 cur.execute(query) 

120 if cur.description: 

121 result = cur.fetchall() 

122 response = Response( 

123 RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]) 

124 ) 

125 else: 

126 response = Response(RESPONSE_TYPE.OK) 

127 self.connection.commit() 

128 except Exception as e: 

129 logger.error(f"Error running query: {query} on {self.database}!") 

130 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

131 self.connection.rollback() 

132 

133 if need_to_close is True: 

134 self.disconnect() 

135 

136 return response 

137 

138 def query(self, query: ASTNode) -> Response: 

139 """ 

140 Receive query as AST (abstract syntax tree) and act upon it somehow. 

141 Args: 

142 query (ASTNode): sql query represented as AST. May be any kind 

143 of query: SELECT, INSERT, DELETE, etc 

144 Returns: 

145 HandlerResponse 

146 """ 

147 renderer = SqlalchemyRender("postgres") 

148 query_str = renderer.get_string(query, with_failback=True) 

149 return self.native_query(query_str) 

150 

151 def get_tables(self) -> Response: 

152 """ 

153 Gets a list of table names in the database. 

154 

155 Returns: 

156 list: A list of table names in the database. 

157 """ 

158 

159 query = f"SELECT TABLENAME FROM DOMAIN.TABLES WHERE TYPE = 'TABLE' AND SCHEMANAME = '{self.user}'" 

160 result = self.native_query(query) 

161 df = result.data_frame 

162 result.data_frame = df.rename(columns={df.columns[0]: "table_name"}) 

163 return result 

164 

165 def get_columns(self, table_name: str) -> Response: 

166 """ 

167 Gets a list of column names in the specified table. 

168 

169 Args: 

170 table_name (str): The name of the table to get column names from. 

171 

172 Returns: 

173 list: A list of column names in the specified table. 

174 """ 

175 

176 query = f"SELECT COLUMNNAME,DATATYPE FROM DOMAIN.COLUMNS WHERE TABLENAME ='{table_name}'" 

177 result = self.native_query(query) 

178 df = result.data_frame 

179 result.data_frame = df.rename(columns={"name": "column_name", "type": "data_type"}) 

180 return self.native_query(query)