Coverage for mindsdb / integrations / handlers / access_handler / access_handler.py: 89%
96 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
2import platform
4import pandas as pd
6from mindsdb_sql_parser import parse_sql
7from mindsdb_sql_parser.ast.base import ASTNode
8from mindsdb.utilities import log
9from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
10from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE,
15)
17try:
18 import pyodbc
19 from sqlalchemy_access.base import AccessDialect
21 IMPORT_ERROR = None
22except ImportError as e:
23 pyodbc = None
24 AccessDialect = None
25 IMPORT_ERROR = e
27logger = log.getLogger(__name__)
30class AccessHandler(DatabaseHandler):
31 """
32 This handler handles connection and execution of the Microsoft Access statements.
33 """
35 name = "access"
37 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
38 """
39 Initialize the handler.
40 Args:
41 name (str): name of particular handler instance
42 connection_data (dict): parameters for connecting to the database
43 **kwargs: arbitrary keyword arguments.
44 """
45 super().__init__(name)
46 self.parser = parse_sql
47 self.dialect = "access"
48 self.connection_data = connection_data
49 self.kwargs = kwargs
51 self.connection = None
52 self.is_connected = False
54 def __del__(self):
55 if self.is_connected is True:
56 self.disconnect()
58 def connect(self):
59 """
60 Set up the connection required by the handler.
61 Returns:
62 pyodbc.Connection: A connection object to the Access database.
63 """
64 if self.is_connected is True:
65 return self.connection
67 if IMPORT_ERROR is not None: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 raise RuntimeError(
69 f"Microsoft Access handler requires pyodbc and sqlalchemy-access packages. "
70 f"Install them with: pip install pyodbc sqlalchemy-access. Error: {IMPORT_ERROR}"
71 )
73 if platform.system() != "Windows": 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 raise RuntimeError(
75 "Microsoft Access handler is only supported on Windows. "
76 "The Microsoft Access ODBC driver is not available on other operating systems."
77 )
79 self.connection = pyodbc.connect(
80 r"Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=" + self.connection_data["db_file"]
81 )
82 self.is_connected = True
84 return self.connection
86 def disconnect(self):
87 """
88 Close any existing connections.
89 """
90 if self.is_connected is False:
91 return
93 self.connection.close()
94 self.is_connected = False
95 return self.is_connected
97 def check_connection(self) -> StatusResponse:
98 """
99 Check connection to the handler.
100 Returns:
101 HandlerStatusResponse
102 """
103 response = StatusResponse(False)
104 need_to_close = self.is_connected is False
106 try:
107 self.connect()
108 response.success = True
109 except Exception as e:
110 logger.error(f"Error connecting to Microsoft Access database {self.connection_data['db_file']}, {e}!")
111 response.error_message = str(e)
112 finally:
113 if response.success is True and need_to_close:
114 self.disconnect()
115 if response.success is False and self.is_connected is True: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 self.is_connected = False
118 return response
120 def native_query(self, query: str) -> StatusResponse:
121 """
122 Receive raw query and act upon it somehow.
123 Args:
124 query (str): query in native format
125 Returns:
126 HandlerResponse
127 """
128 need_to_close = self.is_connected is False
130 connection = self.connect()
131 with connection.cursor() as cursor:
132 try:
133 cursor.execute(query)
134 result = cursor.fetchall()
135 if result:
136 response = Response(
137 RESPONSE_TYPE.TABLE,
138 data_frame=pd.DataFrame.from_records(result, columns=[x[0] for x in cursor.description]),
139 )
141 else:
142 response = Response(RESPONSE_TYPE.OK)
143 connection.commit()
144 except Exception as e:
145 logger.error(f"Error running query: {query} on {self.connection_data['db_file']}!")
146 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
148 if need_to_close is True: 148 ↛ 151line 148 didn't jump to line 151 because the condition on line 148 was always true
149 self.disconnect()
151 return response
153 def query(self, query: ASTNode) -> StatusResponse:
154 """
155 Receive query as AST (abstract syntax tree) and act upon it somehow.
156 Args:
157 query (ASTNode): sql query represented as AST. May be any kind
158 of query: SELECT, INTSERT, DELETE, etc
159 Returns:
160 HandlerResponse
161 """
162 if IMPORT_ERROR is not None: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 raise RuntimeError(
164 f"Microsoft Access handler requires pyodbc and sqlalchemy-access packages. "
165 f"Install them with: pip install pyodbc sqlalchemy-access. Error: {IMPORT_ERROR}"
166 )
168 renderer = SqlalchemyRender(AccessDialect)
169 query_str = renderer.get_string(query, with_failback=True)
170 return self.native_query(query_str)
172 def get_tables(self) -> StatusResponse:
173 """
174 Return list of entities that will be accessible as tables.
175 Returns:
176 HandlerResponse
177 """
178 connection = self.connect()
179 with connection.cursor() as cursor:
180 df = pd.DataFrame([table.table_name for table in cursor.tables(tableType="Table")], columns=["table_name"])
182 response = Response(RESPONSE_TYPE.TABLE, df)
184 return response
186 def get_columns(self, table_name: str) -> StatusResponse:
187 """
188 Returns a list of entity columns.
189 Args:
190 table_name (str): name of one of tables returned by self.get_tables()
191 Returns:
192 HandlerResponse
193 """
194 connection = self.connect()
195 with connection.cursor() as cursor:
196 df = pd.DataFrame(
197 [(column.column_name, column.type_name) for column in cursor.columns(table=table_name)],
198 columns=["column_name", "data_type"],
199 )
201 response = Response(RESPONSE_TYPE.TABLE, df)
203 return response