Coverage for mindsdb / integrations / handlers / airtable_handler / airtable_handler.py: 0%
92 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4import requests
5import duckdb
7from mindsdb_sql_parser import parse_sql
8from mindsdb_sql_parser.ast.base import ASTNode
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.base import DatabaseHandler
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE
16)
18logger = log.getLogger(__name__)
21class AirtableHandler(DatabaseHandler):
22 """
23 This handler handles connection and execution of the Airtable statements.
24 """
26 name = 'airtable'
28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
29 """
30 Initialize the handler.
31 Args:
32 name (str): name of particular handler instance
33 connection_data (dict): parameters for connecting to the database
34 **kwargs: arbitrary keyword arguments.
35 """
36 super().__init__(name)
37 self.parser = parse_sql
38 self.dialect = 'airtable'
39 self.connection_data = connection_data
40 self.kwargs = kwargs
42 self.connection = None
43 self.is_connected = False
45 def __del__(self):
46 if self.is_connected is True:
47 self.disconnect()
49 def connect(self) -> StatusResponse:
50 """
51 Set up the connection required by the handler.
52 Returns:
53 HandlerStatusResponse
54 """
56 if self.is_connected is True:
57 return self.connection
59 url = f"https://api.airtable.com/v0/{self.connection_data['base_id']}/{self.connection_data['table_name']}"
60 headers = {"Authorization": "Bearer " + self.connection_data['api_key']}
62 response = requests.get(url, headers=headers)
63 response = response.json()
64 records = response['records']
66 new_records = True
67 while new_records:
68 try:
69 if response['offset']:
70 params = {"offset": response['offset']}
71 response = requests.get(url, params=params, headers=headers)
72 response = response.json()
74 new_records = response['records']
75 records = records + new_records
76 except Exception:
77 new_records = False
79 rows = [record['fields'] for record in records]
80 globals()[self.connection_data['table_name']] = pd.DataFrame(rows)
82 self.connection = duckdb.connect()
83 self.is_connected = True
85 return self.connection
87 def disconnect(self):
88 """
89 Close any existing connections.
90 """
92 if self.is_connected is False:
93 return
95 self.connection.close()
96 self.is_connected = False
97 return self.is_connected
99 def check_connection(self) -> StatusResponse:
100 """
101 Check connection to the handler.
102 Returns:
103 HandlerStatusResponse
104 """
106 response = StatusResponse(False)
107 need_to_close = self.is_connected is False
109 try:
110 self.connect()
111 response.success = True
112 except Exception as e:
113 logger.error(f'Error connecting to Airtable base {self.connection_data["base_id"]}, {e}!')
114 response.error_message = str(e)
115 finally:
116 if response.success is True and need_to_close:
117 self.disconnect()
118 if response.success is False and self.is_connected is True:
119 self.is_connected = False
121 return response
123 def native_query(self, query: str) -> StatusResponse:
124 """
125 Receive raw query and act upon it somehow.
126 Args:
127 query (str): query in native format
128 Returns:
129 HandlerResponse
130 """
132 need_to_close = self.is_connected is False
134 connection = self.connect()
135 cursor = connection.cursor()
136 try:
137 cursor.execute(query)
138 result = cursor.fetchall()
139 if result:
140 response = Response(
141 RESPONSE_TYPE.TABLE,
142 data_frame=pd.DataFrame(
143 result,
144 columns=[x[0] for x in cursor.description]
145 )
146 )
148 else:
149 response = Response(RESPONSE_TYPE.OK)
150 connection.commit()
151 except Exception as e:
152 logger.error(f'Error running query: {query} on table {self.connection_data["table_name"]} in base {self.connection_data["base_id"]}!')
153 response = Response(
154 RESPONSE_TYPE.ERROR,
155 error_message=str(e)
156 )
158 if need_to_close is True:
159 self.disconnect()
161 return response
163 def query(self, query: ASTNode) -> StatusResponse:
164 """
165 Receive query as AST (abstract syntax tree) and act upon it somehow.
166 Args:
167 query (ASTNode): sql query represented as AST. May be any kind
168 of query: SELECT, INTSERT, DELETE, etc
169 Returns:
170 HandlerResponse
171 """
173 return self.native_query(query.to_string())
175 def get_tables(self) -> StatusResponse:
176 """
177 Return list of entities that will be accessible as tables.
178 Returns:
179 HandlerResponse
180 """
182 response = Response(
183 RESPONSE_TYPE.TABLE,
184 data_frame=pd.DataFrame(
185 [self.connection_data['table_name']],
186 columns=['table_name']
187 )
188 )
190 return response
192 def get_columns(self) -> StatusResponse:
193 """
194 Returns a list of entity columns.
195 Args:
196 table_name (str): name of one of tables returned by self.get_tables()
197 Returns:
198 HandlerResponse
199 """
201 response = Response(
202 RESPONSE_TYPE.TABLE,
203 data_frame=pd.DataFrame(
204 {
205 'column_name': list(globals()[self.connection_data['table_name']].columns),
206 'data_type': globals()[self.connection_data['table_name']].dtypes
207 }
208 )
209 )
211 return response