Coverage for mindsdb / integrations / handlers / nuo_jdbc_handler / nuo_jdbc_handler.py: 0%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2from mindsdb_sql_parser.ast.base import ASTNode 

3from mindsdb.integrations.libs.base import DatabaseHandler 

4from mindsdb.utilities import log 

5from mindsdb_sql_parser import parse_sql 

6from mindsdb.integrations.libs.response import ( 

7 HandlerStatusResponse as StatusResponse, 

8 HandlerResponse as Response, 

9 RESPONSE_TYPE 

10) 

11import pandas as pd 

12import jaydebeapi as jdbcconnector 

13 

14logger = log.getLogger(__name__) 

15 

16 

17class NuoHandler(DatabaseHandler): 

18 

19 name = 'nuo_jdbc' 

20 

21 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

22 """ Initialize the handler 

23 Args: 

24 name (str): name of particular handler instance 

25 connection_data (dict): parameters for connecting to the database 

26 **kwargs: arbitrary keyword arguments. 

27 """ 

28 super().__init__(name) 

29 

30 self.kwargs = kwargs 

31 self.parser = parse_sql 

32 self.database = connection_data['database'] 

33 self.connection_config = connection_data 

34 self.host = connection_data['host'] 

35 self.port = connection_data['port'] 

36 self.user = connection_data['user'] 

37 self.is_direct = connection_data['is_direct'] 

38 self.password = connection_data['password'] 

39 self.connection = None 

40 self.is_connected = False 

41 self.schema = None 

42 

43 self.jdbc_url = self.construct_jdbc_url() 

44 

45 def connect(self): 

46 """ Set up any connections required by the handler 

47 Should return output of check_connection() method after attempting 

48 connection. Should switch self.is_connected. 

49 Returns: 

50 Connection Object 

51 """ 

52 if self.is_connected is True: 

53 return self.connection 

54 

55 jdbc_class = "com.nuodb.jdbc.Driver" 

56 jar_location = self.connection_config.get('jar_location') 

57 

58 try: 

59 if (jar_location): 

60 self.connection = jdbcconnector.connect(jclassname=jdbc_class, url=self.jdbc_url, jars=jar_location) 

61 else: 

62 self.connection = jdbcconnector.connect(jclassname=jdbc_class, url=self.jdbc_url) 

63 except Exception as e: 

64 logger.error(f"Error while connecting to {self.database}, {e}") 

65 

66 return self.connection 

67 

68 def construct_jdbc_url(self): 

69 """ Constructs the JDBC url based on the paramters provided to the handler class.\ 

70 Returns: 

71 The JDBC connection url string. 

72 """ 

73 

74 jdbc_url = "jdbc:com.nuodb://" + self.host 

75 

76 # port is an optional paramter, if found then append 

77 port = self.connection_config.get('port') 

78 if port: 

79 jdbc_url = jdbc_url + ":" + str(port) 

80 

81 jdbc_url = jdbc_url + "/" + self.database + "?user=" + self.user + "&password=" + self.password 

82 

83 # check if a schema is provided in the connection args, if provided use the schema to establish connection 

84 schema = self.connection_config.get('schema') 

85 if schema: 

86 self.schema = schema 

87 jdbc_url = jdbc_url + "&schema=" + schema 

88 

89 # sets direct paramter only if the paramters is specified to be true 

90 if (str(self.is_direct).lower() == 'true'): 

91 jdbc_url = jdbc_url + "&direct=true" 

92 

93 driver_args = self.connection_config.get('driver_args') 

94 

95 # if driver args are present then construct them in the form: &query=one#qquerytwo=true 

96 # finally append these to the url 

97 if (driver_args): 

98 driver_arg_string = '&'.join(driver_args.split(",")) 

99 jdbc_url = jdbc_url + "&" + driver_arg_string 

100 

101 return jdbc_url 

102 

103 def disconnect(self): 

104 """ Close any existing connections 

105 Should switch self.is_connected. 

106 """ 

107 if self.is_connected is False: 

108 return 

109 try: 

110 self.connection.close() 

111 self.is_connected = False 

112 except Exception as e: 

113 logger.error(f"Error while disconnecting to {self.database}, {e}") 

114 

115 return 

116 

117 def check_connection(self) -> StatusResponse: 

118 """ Check connection to the handler 

119 Returns: 

120 HandlerStatusResponse 

121 """ 

122 responseCode = StatusResponse(False) 

123 need_to_close = self.is_connected is False 

124 

125 try: 

126 self.connect() 

127 responseCode.success = True 

128 except Exception as e: 

129 logger.error(f'Error connecting to database {self.database}, {e}!') 

130 responseCode.error_message = str(e) 

131 finally: 

132 if responseCode.success is True and need_to_close: 

133 self.disconnect() 

134 if responseCode.success is False and self.is_connected is True: 

135 self.is_connected = False 

136 

137 return responseCode 

138 

139 def native_query(self, query: str) -> StatusResponse: 

140 """Receive raw query and act upon it somehow. 

141 Args: 

142 query (Any): query in native format (str for sql databases, 

143 dict for mongo, etc) 

144 Returns: 

145 HandlerResponse 

146 """ 

147 need_to_close = self.is_connected is False 

148 conn = self.connect() 

149 with conn.cursor() as cur: 

150 try: 

151 cur.execute(query) 

152 if cur.description: 

153 result = cur.fetchall() 

154 response = Response( 

155 RESPONSE_TYPE.TABLE, 

156 data_frame=pd.DataFrame( 

157 result, 

158 columns=[x[0] for x in cur.description] 

159 ) 

160 ) 

161 else: 

162 response = Response(RESPONSE_TYPE.OK) 

163 self.connection.commit() 

164 except Exception as e: 

165 logger.error(f'Error running query: {query} on {self.database}!') 

166 response = Response( 

167 RESPONSE_TYPE.ERROR, 

168 error_message=str(e) 

169 ) 

170 self.connection.rollback() 

171 

172 if need_to_close is True: 

173 self.disconnect() 

174 

175 return response 

176 

177 def query(self, query: ASTNode) -> StatusResponse: 

178 """Render and execute a SQL query. 

179 

180 Args: 

181 query (ASTNode): The SQL query. 

182 

183 Returns: 

184 Response: The query result. 

185 """ 

186 if isinstance(query, ASTNode): 

187 query_str = query.to_string() 

188 else: 

189 query_str = str(query) 

190 

191 return self.native_query(query_str) 

192 

193 def get_tables(self) -> StatusResponse: 

194 """Get a list of all the tables in the database. 

195 

196 Returns: 

197 Response: Names of the tables in the database. 

198 """ 

199 if self.schema: 

200 query = f''' SELECT TABLENAME FROM SYSTEM.TABLES WHERE SCHEMA = '{self.schema}' ''' 

201 else: 

202 query = ''' SELECT TABLENAME FROM SYSTEM.TABLES WHERE SCHEMA != 'SYSTEM' ''' 

203 

204 result = self.native_query(query) 

205 df = result.data_frame 

206 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) 

207 return result 

208 

209 def get_columns(self, table_name: str) -> StatusResponse: 

210 """Get details about a table. 

211 

212 Args: 

213 table_name (str): Name of the table to retrieve details of. 

214 

215 Returns: 

216 Response: Details of the table. 

217 """ 

218 

219 query = f''' SELECT FIELD FROM SYSTEM.FIELDS WHERE TABLENAME='{table_name}' ''' 

220 return self.native_query(query)