Coverage for mindsdb / integrations / handlers / access_handler / access_handler.py: 89%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2import platform 

3 

4import pandas as pd 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb_sql_parser.ast.base import ASTNode 

8from mindsdb.utilities import log 

9from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE, 

15) 

16 

17try: 

18 import pyodbc 

19 from sqlalchemy_access.base import AccessDialect 

20 

21 IMPORT_ERROR = None 

22except ImportError as e: 

23 pyodbc = None 

24 AccessDialect = None 

25 IMPORT_ERROR = e 

26 

27logger = log.getLogger(__name__) 

28 

29 

30class AccessHandler(DatabaseHandler): 

31 """ 

32 This handler handles connection and execution of the Microsoft Access statements. 

33 """ 

34 

35 name = "access" 

36 

37 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

38 """ 

39 Initialize the handler. 

40 Args: 

41 name (str): name of particular handler instance 

42 connection_data (dict): parameters for connecting to the database 

43 **kwargs: arbitrary keyword arguments. 

44 """ 

45 super().__init__(name) 

46 self.parser = parse_sql 

47 self.dialect = "access" 

48 self.connection_data = connection_data 

49 self.kwargs = kwargs 

50 

51 self.connection = None 

52 self.is_connected = False 

53 

54 def __del__(self): 

55 if self.is_connected is True: 

56 self.disconnect() 

57 

58 def connect(self): 

59 """ 

60 Set up the connection required by the handler. 

61 Returns: 

62 pyodbc.Connection: A connection object to the Access database. 

63 """ 

64 if self.is_connected is True: 

65 return self.connection 

66 

67 if IMPORT_ERROR is not None: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 raise RuntimeError( 

69 f"Microsoft Access handler requires pyodbc and sqlalchemy-access packages. " 

70 f"Install them with: pip install pyodbc sqlalchemy-access. Error: {IMPORT_ERROR}" 

71 ) 

72 

73 if platform.system() != "Windows": 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 raise RuntimeError( 

75 "Microsoft Access handler is only supported on Windows. " 

76 "The Microsoft Access ODBC driver is not available on other operating systems." 

77 ) 

78 

79 self.connection = pyodbc.connect( 

80 r"Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=" + self.connection_data["db_file"] 

81 ) 

82 self.is_connected = True 

83 

84 return self.connection 

85 

86 def disconnect(self): 

87 """ 

88 Close any existing connections. 

89 """ 

90 if self.is_connected is False: 

91 return 

92 

93 self.connection.close() 

94 self.is_connected = False 

95 return self.is_connected 

96 

97 def check_connection(self) -> StatusResponse: 

98 """ 

99 Check connection to the handler. 

100 Returns: 

101 HandlerStatusResponse 

102 """ 

103 response = StatusResponse(False) 

104 need_to_close = self.is_connected is False 

105 

106 try: 

107 self.connect() 

108 response.success = True 

109 except Exception as e: 

110 logger.error(f"Error connecting to Microsoft Access database {self.connection_data['db_file']}, {e}!") 

111 response.error_message = str(e) 

112 finally: 

113 if response.success is True and need_to_close: 

114 self.disconnect() 

115 if response.success is False and self.is_connected is True: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 self.is_connected = False 

117 

118 return response 

119 

120 def native_query(self, query: str) -> StatusResponse: 

121 """ 

122 Receive raw query and act upon it somehow. 

123 Args: 

124 query (str): query in native format 

125 Returns: 

126 HandlerResponse 

127 """ 

128 need_to_close = self.is_connected is False 

129 

130 connection = self.connect() 

131 with connection.cursor() as cursor: 

132 try: 

133 cursor.execute(query) 

134 result = cursor.fetchall() 

135 if result: 

136 response = Response( 

137 RESPONSE_TYPE.TABLE, 

138 data_frame=pd.DataFrame.from_records(result, columns=[x[0] for x in cursor.description]), 

139 ) 

140 

141 else: 

142 response = Response(RESPONSE_TYPE.OK) 

143 connection.commit() 

144 except Exception as e: 

145 logger.error(f"Error running query: {query} on {self.connection_data['db_file']}!") 

146 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

147 

148 if need_to_close is True: 148 ↛ 151line 148 didn't jump to line 151 because the condition on line 148 was always true

149 self.disconnect() 

150 

151 return response 

152 

153 def query(self, query: ASTNode) -> StatusResponse: 

154 """ 

155 Receive query as AST (abstract syntax tree) and act upon it somehow. 

156 Args: 

157 query (ASTNode): sql query represented as AST. May be any kind 

158 of query: SELECT, INTSERT, DELETE, etc 

159 Returns: 

160 HandlerResponse 

161 """ 

162 if IMPORT_ERROR is not None: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 raise RuntimeError( 

164 f"Microsoft Access handler requires pyodbc and sqlalchemy-access packages. " 

165 f"Install them with: pip install pyodbc sqlalchemy-access. Error: {IMPORT_ERROR}" 

166 ) 

167 

168 renderer = SqlalchemyRender(AccessDialect) 

169 query_str = renderer.get_string(query, with_failback=True) 

170 return self.native_query(query_str) 

171 

172 def get_tables(self) -> StatusResponse: 

173 """ 

174 Return list of entities that will be accessible as tables. 

175 Returns: 

176 HandlerResponse 

177 """ 

178 connection = self.connect() 

179 with connection.cursor() as cursor: 

180 df = pd.DataFrame([table.table_name for table in cursor.tables(tableType="Table")], columns=["table_name"]) 

181 

182 response = Response(RESPONSE_TYPE.TABLE, df) 

183 

184 return response 

185 

186 def get_columns(self, table_name: str) -> StatusResponse: 

187 """ 

188 Returns a list of entity columns. 

189 Args: 

190 table_name (str): name of one of tables returned by self.get_tables() 

191 Returns: 

192 HandlerResponse 

193 """ 

194 connection = self.connect() 

195 with connection.cursor() as cursor: 

196 df = pd.DataFrame( 

197 [(column.column_name, column.type_name) for column in cursor.columns(table=table_name)], 

198 columns=["column_name", "data_type"], 

199 ) 

200 

201 response = Response(RESPONSE_TYPE.TABLE, df) 

202 

203 return response