Coverage for mindsdb / integrations / handlers / airtable_handler / airtable_handler.py: 0%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4import requests 

5import duckdb 

6 

7from mindsdb_sql_parser import parse_sql 

8from mindsdb_sql_parser.ast.base import ASTNode 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.base import DatabaseHandler 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE 

16) 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class AirtableHandler(DatabaseHandler): 

22 """ 

23 This handler handles connection and execution of the Airtable statements. 

24 """ 

25 

26 name = 'airtable' 

27 

28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

29 """ 

30 Initialize the handler. 

31 Args: 

32 name (str): name of particular handler instance 

33 connection_data (dict): parameters for connecting to the database 

34 **kwargs: arbitrary keyword arguments. 

35 """ 

36 super().__init__(name) 

37 self.parser = parse_sql 

38 self.dialect = 'airtable' 

39 self.connection_data = connection_data 

40 self.kwargs = kwargs 

41 

42 self.connection = None 

43 self.is_connected = False 

44 

45 def __del__(self): 

46 if self.is_connected is True: 

47 self.disconnect() 

48 

49 def connect(self) -> StatusResponse: 

50 """ 

51 Set up the connection required by the handler. 

52 Returns: 

53 HandlerStatusResponse 

54 """ 

55 

56 if self.is_connected is True: 

57 return self.connection 

58 

59 url = f"https://api.airtable.com/v0/{self.connection_data['base_id']}/{self.connection_data['table_name']}" 

60 headers = {"Authorization": "Bearer " + self.connection_data['api_key']} 

61 

62 response = requests.get(url, headers=headers) 

63 response = response.json() 

64 records = response['records'] 

65 

66 new_records = True 

67 while new_records: 

68 try: 

69 if response['offset']: 

70 params = {"offset": response['offset']} 

71 response = requests.get(url, params=params, headers=headers) 

72 response = response.json() 

73 

74 new_records = response['records'] 

75 records = records + new_records 

76 except Exception: 

77 new_records = False 

78 

79 rows = [record['fields'] for record in records] 

80 globals()[self.connection_data['table_name']] = pd.DataFrame(rows) 

81 

82 self.connection = duckdb.connect() 

83 self.is_connected = True 

84 

85 return self.connection 

86 

87 def disconnect(self): 

88 """ 

89 Close any existing connections. 

90 """ 

91 

92 if self.is_connected is False: 

93 return 

94 

95 self.connection.close() 

96 self.is_connected = False 

97 return self.is_connected 

98 

99 def check_connection(self) -> StatusResponse: 

100 """ 

101 Check connection to the handler. 

102 Returns: 

103 HandlerStatusResponse 

104 """ 

105 

106 response = StatusResponse(False) 

107 need_to_close = self.is_connected is False 

108 

109 try: 

110 self.connect() 

111 response.success = True 

112 except Exception as e: 

113 logger.error(f'Error connecting to Airtable base {self.connection_data["base_id"]}, {e}!') 

114 response.error_message = str(e) 

115 finally: 

116 if response.success is True and need_to_close: 

117 self.disconnect() 

118 if response.success is False and self.is_connected is True: 

119 self.is_connected = False 

120 

121 return response 

122 

123 def native_query(self, query: str) -> StatusResponse: 

124 """ 

125 Receive raw query and act upon it somehow. 

126 Args: 

127 query (str): query in native format 

128 Returns: 

129 HandlerResponse 

130 """ 

131 

132 need_to_close = self.is_connected is False 

133 

134 connection = self.connect() 

135 cursor = connection.cursor() 

136 try: 

137 cursor.execute(query) 

138 result = cursor.fetchall() 

139 if result: 

140 response = Response( 

141 RESPONSE_TYPE.TABLE, 

142 data_frame=pd.DataFrame( 

143 result, 

144 columns=[x[0] for x in cursor.description] 

145 ) 

146 ) 

147 

148 else: 

149 response = Response(RESPONSE_TYPE.OK) 

150 connection.commit() 

151 except Exception as e: 

152 logger.error(f'Error running query: {query} on table {self.connection_data["table_name"]} in base {self.connection_data["base_id"]}!') 

153 response = Response( 

154 RESPONSE_TYPE.ERROR, 

155 error_message=str(e) 

156 ) 

157 

158 if need_to_close is True: 

159 self.disconnect() 

160 

161 return response 

162 

163 def query(self, query: ASTNode) -> StatusResponse: 

164 """ 

165 Receive query as AST (abstract syntax tree) and act upon it somehow. 

166 Args: 

167 query (ASTNode): sql query represented as AST. May be any kind 

168 of query: SELECT, INTSERT, DELETE, etc 

169 Returns: 

170 HandlerResponse 

171 """ 

172 

173 return self.native_query(query.to_string()) 

174 

175 def get_tables(self) -> StatusResponse: 

176 """ 

177 Return list of entities that will be accessible as tables. 

178 Returns: 

179 HandlerResponse 

180 """ 

181 

182 response = Response( 

183 RESPONSE_TYPE.TABLE, 

184 data_frame=pd.DataFrame( 

185 [self.connection_data['table_name']], 

186 columns=['table_name'] 

187 ) 

188 ) 

189 

190 return response 

191 

192 def get_columns(self) -> StatusResponse: 

193 """ 

194 Returns a list of entity columns. 

195 Args: 

196 table_name (str): name of one of tables returned by self.get_tables() 

197 Returns: 

198 HandlerResponse 

199 """ 

200 

201 response = Response( 

202 RESPONSE_TYPE.TABLE, 

203 data_frame=pd.DataFrame( 

204 { 

205 'column_name': list(globals()[self.connection_data['table_name']].columns), 

206 'data_type': globals()[self.connection_data['table_name']].dtypes 

207 } 

208 ) 

209 ) 

210 

211 return response