Coverage for mindsdb / integrations / handlers / empress_handler / empress_handler.py: 0%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pyodbc 

2 

3import pandas as pd 

4from mindsdb_sql_parser import parse_sql 

5 

6from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

7from mindsdb_sql_parser.ast.base import ASTNode 

8from mindsdb.integrations.libs.base import DatabaseHandler 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE 

15) 

16 

17logger = log.getLogger(__name__) 

18 

19 

20class EmpressHandler(DatabaseHandler): 

21 """ 

22 This handler handles connection and execution of the Empress Embedded statements. 

23 """ 

24 

25 name = 'empress' 

26 

27 def __init__(self, name: str, **kwargs): 

28 """ 

29 Initializes a new instance of the Empress Embedded handler. 

30 

31 Args: 

32 name (str): The name of the database. 

33 connection_data (dict): parameters for connecting to the database 

34 **kwargs: Arbitrary keyword arguments. 

35 """ 

36 super().__init__(name) 

37 self.parser = parse_sql 

38 self.dialect = 'empress' 

39 self.connection_args = kwargs.get('connection_data') 

40 self.database = self.connection_args.get('database') 

41 self.server = self.connection_args.get('server') 

42 self.user = self.connection_args.get('user') 

43 self.password = self.connection_args.get('password') 

44 self.host = self.connection_args.get('host') 

45 self.port = self.connection_args.get('port', 6322) 

46 self.connection = None 

47 self.is_connected = False 

48 

49 def __del__(self): 

50 """ 

51 Destructor for the Empress Embedded class. 

52 """ 

53 if self.is_connected is True: 

54 self.disconnect() 

55 

56 def connect(self) -> StatusResponse: 

57 """ 

58 Establishes a connection to the Empress Embedded server. 

59 Returns: 

60 HandlerStatusResponse 

61 """ 

62 if self.is_connected: 

63 return self.connection 

64 

65 conn_str = f"DRIVER={{Empress ODBC Interface [Default]}};Server={self.server};Port={self.port};UID={self.user};PWD={self.password};Database={self.database};" 

66 self.connection = pyodbc.connect(conn_str) 

67 self.is_connected = True 

68 return self.connection 

69 

70 def check_connection(self) -> StatusResponse: 

71 """ 

72 Check connection to the handler. 

73 Returns: 

74 HandlerStatusResponse 

75 """ 

76 

77 response = StatusResponse(False) 

78 need_to_close = self.is_connected is False 

79 

80 try: 

81 self.connect() 

82 response.success = True 

83 except Exception as e: 

84 logger.error(f'Error connecting to Empress Embedded, {e}!') 

85 response.error_message = str(e) 

86 finally: 

87 if response.success is True and need_to_close: 

88 self.disconnect() 

89 if response.success is False and self.is_connected is True: 

90 self.is_connected = False 

91 

92 return response 

93 

94 def disconnect(self): 

95 """ 

96 Closes the connection to the Empress Embedded server. 

97 """ 

98 

99 if self.is_connected is False: 

100 return 

101 

102 self.connection.close() 

103 self.is_connected = False 

104 return self.is_connected 

105 

106 def native_query(self, query: str) -> Response: 

107 """ 

108 Receive raw query and act upon it somehow. 

109 Args: 

110 query (str): SQL query to execute. 

111 Returns: 

112 HandlerResponse 

113 """ 

114 need_to_close = self.is_connected is False 

115 

116 connection = self.connect() 

117 with connection.cursor() as cursor: 

118 try: 

119 cursor.execute(query) 

120 result = cursor.fetchall() 

121 if result: 

122 response = Response( 

123 RESPONSE_TYPE.TABLE, 

124 data_frame=pd.DataFrame.from_records( 

125 result, 

126 columns=[x[0] for x in cursor.description] 

127 ) 

128 ) 

129 else: 

130 response = Response(RESPONSE_TYPE.OK) 

131 connection.commit() 

132 except Exception as e: 

133 logger.error(f'Error running query: {query} on {self.connection_args["database"]}!') 

134 response = Response( 

135 RESPONSE_TYPE.ERROR, 

136 error_message=str(e) 

137 ) 

138 

139 if need_to_close is True: 

140 self.disconnect() 

141 

142 return response 

143 

144 def query(self, query: ASTNode) -> Response: 

145 """ 

146 Receive query as AST (abstract syntax tree) and act upon it somehow. 

147 Args: 

148 query (ASTNode): sql query represented as AST. May be any kind 

149 of query: SELECT, INSERT, DELETE, etc 

150 Returns: 

151 HandlerResponse 

152 """ 

153 

154 renderer = SqlalchemyRender('sqlite') 

155 

156 query_str = renderer.get_string(query, with_failback=True) 

157 return self.native_query(query_str) 

158 

159 def get_tables(self) -> Response: 

160 """ 

161 Gets a list of table names in the database. 

162 

163 Returns: 

164 list: A list of table names in the database. 

165 """ 

166 connection = self.connect() 

167 cursor = connection.cursor() 

168 # Execute query to get all table names 

169 cursor.execute( 

170 "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'") 

171 

172 table_names = [x[0] for x in cursor.fetchall()] 

173 

174 # Create dataframe with table names 

175 df = pd.DataFrame(table_names, columns=['table_name', 'data_type']) 

176 

177 # Create response object 

178 response = Response( 

179 RESPONSE_TYPE.TABLE, 

180 df 

181 ) 

182 

183 return response 

184 

185 def get_columns(self, table_name: str) -> Response: 

186 """ 

187 Gets a list of column names in the specified table. 

188 

189 Args: 

190 table_name (str): The name of the table to get column names from. 

191 

192 Returns: 

193 list: A list of column names in the specified table. 

194 """ 

195 conn = self.connect() 

196 cursor = conn.cursor() 

197 cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name='{}'".format(table_name)) 

198 results = cursor.fetchall() 

199 

200 # construct a pandas dataframe from the query results 

201 df = pd.DataFrame( 

202 results, 

203 columns=['column_name', 'data_type'] 

204 ) 

205 

206 response = Response( 

207 RESPONSE_TYPE.TABLE, 

208 df 

209 ) 

210 

211 return response