Coverage for mindsdb / integrations / handlers / empress_handler / empress_handler.py: 0%
91 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pyodbc
3import pandas as pd
4from mindsdb_sql_parser import parse_sql
6from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
7from mindsdb_sql_parser.ast.base import ASTNode
8from mindsdb.integrations.libs.base import DatabaseHandler
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE
15)
17logger = log.getLogger(__name__)
20class EmpressHandler(DatabaseHandler):
21 """
22 This handler handles connection and execution of the Empress Embedded statements.
23 """
25 name = 'empress'
27 def __init__(self, name: str, **kwargs):
28 """
29 Initializes a new instance of the Empress Embedded handler.
31 Args:
32 name (str): The name of the database.
33 connection_data (dict): parameters for connecting to the database
34 **kwargs: Arbitrary keyword arguments.
35 """
36 super().__init__(name)
37 self.parser = parse_sql
38 self.dialect = 'empress'
39 self.connection_args = kwargs.get('connection_data')
40 self.database = self.connection_args.get('database')
41 self.server = self.connection_args.get('server')
42 self.user = self.connection_args.get('user')
43 self.password = self.connection_args.get('password')
44 self.host = self.connection_args.get('host')
45 self.port = self.connection_args.get('port', 6322)
46 self.connection = None
47 self.is_connected = False
49 def __del__(self):
50 """
51 Destructor for the Empress Embedded class.
52 """
53 if self.is_connected is True:
54 self.disconnect()
56 def connect(self) -> StatusResponse:
57 """
58 Establishes a connection to the Empress Embedded server.
59 Returns:
60 HandlerStatusResponse
61 """
62 if self.is_connected:
63 return self.connection
65 conn_str = f"DRIVER={{Empress ODBC Interface [Default]}};Server={self.server};Port={self.port};UID={self.user};PWD={self.password};Database={self.database};"
66 self.connection = pyodbc.connect(conn_str)
67 self.is_connected = True
68 return self.connection
70 def check_connection(self) -> StatusResponse:
71 """
72 Check connection to the handler.
73 Returns:
74 HandlerStatusResponse
75 """
77 response = StatusResponse(False)
78 need_to_close = self.is_connected is False
80 try:
81 self.connect()
82 response.success = True
83 except Exception as e:
84 logger.error(f'Error connecting to Empress Embedded, {e}!')
85 response.error_message = str(e)
86 finally:
87 if response.success is True and need_to_close:
88 self.disconnect()
89 if response.success is False and self.is_connected is True:
90 self.is_connected = False
92 return response
94 def disconnect(self):
95 """
96 Closes the connection to the Empress Embedded server.
97 """
99 if self.is_connected is False:
100 return
102 self.connection.close()
103 self.is_connected = False
104 return self.is_connected
106 def native_query(self, query: str) -> Response:
107 """
108 Receive raw query and act upon it somehow.
109 Args:
110 query (str): SQL query to execute.
111 Returns:
112 HandlerResponse
113 """
114 need_to_close = self.is_connected is False
116 connection = self.connect()
117 with connection.cursor() as cursor:
118 try:
119 cursor.execute(query)
120 result = cursor.fetchall()
121 if result:
122 response = Response(
123 RESPONSE_TYPE.TABLE,
124 data_frame=pd.DataFrame.from_records(
125 result,
126 columns=[x[0] for x in cursor.description]
127 )
128 )
129 else:
130 response = Response(RESPONSE_TYPE.OK)
131 connection.commit()
132 except Exception as e:
133 logger.error(f'Error running query: {query} on {self.connection_args["database"]}!')
134 response = Response(
135 RESPONSE_TYPE.ERROR,
136 error_message=str(e)
137 )
139 if need_to_close is True:
140 self.disconnect()
142 return response
144 def query(self, query: ASTNode) -> Response:
145 """
146 Receive query as AST (abstract syntax tree) and act upon it somehow.
147 Args:
148 query (ASTNode): sql query represented as AST. May be any kind
149 of query: SELECT, INSERT, DELETE, etc
150 Returns:
151 HandlerResponse
152 """
154 renderer = SqlalchemyRender('sqlite')
156 query_str = renderer.get_string(query, with_failback=True)
157 return self.native_query(query_str)
159 def get_tables(self) -> Response:
160 """
161 Gets a list of table names in the database.
163 Returns:
164 list: A list of table names in the database.
165 """
166 connection = self.connect()
167 cursor = connection.cursor()
168 # Execute query to get all table names
169 cursor.execute(
170 "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'")
172 table_names = [x[0] for x in cursor.fetchall()]
174 # Create dataframe with table names
175 df = pd.DataFrame(table_names, columns=['table_name', 'data_type'])
177 # Create response object
178 response = Response(
179 RESPONSE_TYPE.TABLE,
180 df
181 )
183 return response
185 def get_columns(self, table_name: str) -> Response:
186 """
187 Gets a list of column names in the specified table.
189 Args:
190 table_name (str): The name of the table to get column names from.
192 Returns:
193 list: A list of column names in the specified table.
194 """
195 conn = self.connect()
196 cursor = conn.cursor()
197 cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name='{}'".format(table_name))
198 results = cursor.fetchall()
200 # construct a pandas dataframe from the query results
201 df = pd.DataFrame(
202 results,
203 columns=['column_name', 'data_type']
204 )
206 response = Response(
207 RESPONSE_TYPE.TABLE,
208 df
209 )
211 return response