Coverage for mindsdb / integrations / handlers / gmail_handler / gmail_handler.py: 0%
274 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
3from mindsdb.integrations.libs.response import (
4 HandlerStatusResponse as StatusResponse,
5 HandlerResponse as Response
6)
8from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions
9from mindsdb.integrations.libs.api_handler import APIHandler, APITable
10from mindsdb_sql_parser import ast
11from mindsdb.utilities import log
12from mindsdb_sql_parser import parse_sql
13from mindsdb.utilities.config import Config
15import time
16from typing import List
17import pandas as pd
19from googleapiclient.discovery import build
20from googleapiclient.errors import HttpError
21from email.message import EmailMessage
23from base64 import urlsafe_b64encode, urlsafe_b64decode
25from mindsdb.integrations.utilities.handlers.auth_utilities.google import GoogleUserOAuth2Manager
26from mindsdb.integrations.utilities.handlers.auth_utilities.exceptions import AuthException
28DEFAULT_SCOPES = [
29 'https://www.googleapis.com/auth/gmail.compose',
30 'https://www.googleapis.com/auth/gmail.readonly',
31 'https://www.googleapis.com/auth/gmail.modify'
32]
34logger = log.getLogger(__name__)
37class EmailsTable(APITable):
38 """Implementation for the emails table for Gmail"""
40 def select(self, query: ast.Select) -> pd.DataFrame:
41 """Pulls emails from Gmail "users.messages.list" API
43 Parameters
44 ----------
45 query : ast.Select
46 Given SQL SELECT query
48 Returns
49 -------
50 pd.DataFrame
51 Email matching the query
53 Raises
54 ------
55 NotImplementedError
56 If the query contains an unsupported operation or condition
57 """
59 conditions = extract_comparison_conditions(query.where)
61 params = {}
62 for op, arg1, arg2 in conditions:
64 if op == 'or':
65 raise NotImplementedError('OR is not supported')
67 if arg1 in ['query', 'label_ids', 'include_spam_trash']:
68 if op == '=':
69 if arg1 == 'query':
70 params['q'] = arg2
71 elif arg1 == 'label_ids':
72 params['labelIds'] = arg2.split(',')
73 else:
74 params['includeSpamTrash'] = arg2
75 else:
76 raise NotImplementedError(f'Unknown op: {op}')
78 else:
79 raise NotImplementedError(f'Unknown clause: {arg1}')
81 if query.limit is not None:
82 params['maxResults'] = query.limit.value
84 result = self.handler.call_gmail_api(
85 method_name='list_messages',
86 params=params
87 )
88 # filter targets
89 columns = []
90 for target in query.targets:
91 if isinstance(target, ast.Star):
92 columns = self.get_columns()
93 break
94 elif isinstance(target, ast.Identifier):
95 columns.append(target.parts[-1])
96 else:
97 raise NotImplementedError(f"Unknown query target {type(target)}")
99 # columns to lower case
100 columns = [name.lower() for name in columns]
102 if len(result) == 0:
103 return pd.DataFrame([], columns=columns)
105 # add absent columns
106 for col in set(columns) & set(result.columns) ^ set(columns):
107 result[col] = None
109 # filter by columns
110 result = result[columns]
112 # Rename columns
113 for target in query.targets:
114 if target.alias:
115 result.rename(columns={target.parts[-1]: str(target.alias)}, inplace=True)
117 return result
119 def get_columns(self) -> List[str]:
120 """Gets all columns to be returned in pandas DataFrame responses
122 Returns
123 -------
124 List[str]
125 List of columns
126 """
127 return [
128 'id',
129 'message_id',
130 'thread_id',
131 'label_ids',
132 'sender',
133 'to',
134 'date',
135 'subject',
136 'snippet',
137 'history_id',
138 'size_estimate',
139 'body',
140 'attachments',
141 ]
143 def insert(self, query: ast.Insert):
144 """Sends reply emails using the Gmail "users.messages.send" API
146 Parameters
147 ----------
148 query : ast.Insert
149 Given SQL INSERT query
151 Raises
152 ------
153 ValueError
154 If the query contains an unsupported condition
155 """
156 columns = [col.name for col in query.columns]
158 supported_columns = {"message_id", "thread_id", "to_email", "subject", "body"}
159 if not set(columns).issubset(supported_columns):
160 unsupported_columns = set(columns).difference(supported_columns)
161 raise ValueError(
162 "Unsupported columns for create email: "
163 + ", ".join(unsupported_columns)
164 )
166 for row in query.values:
167 params = dict(zip(columns, row))
169 if 'to_email' not in params:
170 raise ValueError('"to_email" parameter is required to send an email')
172 message = EmailMessage()
173 message['To'] = params['to_email']
174 message['Subject'] = params['subject'] if 'subject' in params else ''
176 content = params['body'] if 'body' in params else ''
177 message.set_content(content)
179 # If threadId is present then add References and In-Reply-To headers
180 # so that proper threading can happen
181 if 'thread_id' in params and 'message_id' in params:
182 message['In-Reply-To'] = params['message_id']
183 message['References'] = params['message_id']
185 encoded_message = urlsafe_b64encode(message.as_bytes()).decode()
187 message = {
188 'raw': encoded_message
189 }
191 if 'thread_id' in params:
192 message['threadId'] = params['thread_id']
193 self.handler.call_gmail_api('send_message', {'body': message})
195 def delete(self, query: ast.Delete):
196 """
197 Deletes an event or events in the calendar.
199 Args:
200 query (ast.Delete): SQL query to parse.
202 Returns:
203 Response: Response object containing the results.
204 """
206 # Parse the query to get the conditions.
207 conditions = extract_comparison_conditions(query.where)
208 for op, arg1, arg2 in conditions:
209 if op == 'or':
210 raise NotImplementedError('OR is not supported')
211 if arg1 == 'message_id':
212 if op == '=':
213 self.handler.call_gmail_api('delete_message', {'id': arg2})
214 else:
215 raise NotImplementedError(f'Unknown op: {op}')
216 else:
217 raise NotImplementedError(f'Unknown clause: {arg1}')
219 def update(self, query: ast.Update) -> None:
220 """Updates a label of a message.
222 Args:
223 query (ASTNode): The SQL query to parse.
225 Raises:
226 NotImplementedError: If the query contains an unsupported condition.
227 """
228 params = {}
229 conditions = extract_comparison_conditions(query.where)
230 for op, arg1, arg2 in conditions:
231 if op == 'or':
232 raise NotImplementedError('OR is not supported')
233 if arg1 == 'id':
234 if op == '=':
235 params['id'] = arg2
236 else:
237 raise NotImplementedError(f'Unknown op: {op}')
238 else:
239 raise NotImplementedError(f'Unknown clause: {arg1}')
240 request_body = {}
241 values = query.update_columns.items()
242 data_list = list(values)
243 add_label = []
244 remove_label = []
245 for key, value in data_list:
246 if key == 'addLabel':
247 add_label.append(str(value)[1:-1])
248 elif key == 'removeLabel':
249 remove_label.append(str(value)[1:-1])
250 else:
251 raise NotImplementedError(f'Unknown clause: {key}')
252 if add_label:
253 request_body['addLabelIds'] = add_label
254 if remove_label:
255 request_body['removeLabelIds'] = remove_label
256 params['body'] = request_body
257 self.handler.call_gmail_api('update_message', params)
260class GmailHandler(APIHandler):
261 """A class for handling connections and interactions with the Gmail API.
263 Attributes:
264 credentials_file (str): The path to the Google Auth Credentials file for authentication
265 and interacting with the Gmail API on behalf of the uesr.
267 scopes (List[str], Optional): The scopes to use when authenticating with the Gmail API.
268 """
270 def __init__(self, name=None, **kwargs):
271 super().__init__(name)
272 self.connection_args = kwargs.get('connection_data', {})
274 self.token_file = None
275 self.max_page_size = 500
276 self.max_batch_size = 100
277 self.service = None
278 self.is_connected = False
280 self.handler_storage = kwargs['handler_storage']
282 self.credentials_url = self.connection_args.get('credentials_url', None)
283 self.credentials_file = self.connection_args.get('credentials_file', None)
284 if self.connection_args.get('credentials'):
285 self.credentials_file = self.connection_args.pop('credentials')
286 if not self.credentials_file and not self.credentials_url:
287 # try to get from config
288 gm_config = Config().get('handlers', {}).get('gmail', {})
289 secret_file = gm_config.get('credentials_file')
290 secret_url = gm_config.get('credentials_url')
291 if secret_file:
292 self.credentials_file = secret_file
293 elif secret_url:
294 self.credentials_url = secret_url
296 self.scopes = self.connection_args.get('scopes', DEFAULT_SCOPES)
298 emails = EmailsTable(self)
299 self.emails = emails
300 self._register_table('emails', emails)
302 def connect(self):
303 """Authenticate with the Gmail API using the credentials file.
305 Returns
306 -------
307 service: object
308 The authenticated Gmail API service object.
309 """
310 if self.is_connected and self.service is not None:
311 return self.service
313 google_oauth2_manager = GoogleUserOAuth2Manager(self.handler_storage, self.scopes, self.credentials_file, self.credentials_url, self.connection_args.get('code'))
314 creds = google_oauth2_manager.get_oauth2_credentials()
316 self.service = build('gmail', 'v1', credentials=creds)
318 self.is_connected = True
319 return self.service
321 def check_connection(self) -> StatusResponse:
322 """Check connection to the handler.
324 Returns
325 -------
326 StatusResponse
327 Status confirmation
328 """
329 response = StatusResponse(False)
331 try:
332 # Call the Gmail API
333 service = self.connect()
335 result = service.users().getProfile(userId='me').execute()
337 if result and result.get('emailAddress', None) is not None:
338 response.success = True
339 response.copy_storage = True
340 except AuthException as error:
341 response.error_message = str(error)
342 response.redirect_url = error.auth_url
343 return response
345 except HttpError as error:
346 response.error_message = f'Error connecting to Gmail api: {error}.'
347 logger.error(response.error_message)
349 if response.success is False and self.is_connected is True:
350 self.is_connected = False
352 return response
354 def native_query(self, query_string: str = None) -> Response:
355 ast = parse_sql(query_string)
357 return self.query(ast)
359 def _parse_parts(self, parts, attachments):
360 if not parts:
361 return ''
363 body = ''
364 for part in parts:
365 if part['mimeType'] == 'text/plain':
366 part_body = part.get('body', {}).get('data', '')
367 body += urlsafe_b64decode(part_body).decode('utf-8')
368 elif part['mimeType'] == 'multipart/alternative' or 'parts' in part:
369 # Recursively iterate over nested parts to find the plain text body
370 body += self._parse_parts(part['parts'], attachments)
371 elif part.get('filename') and part.get('body') and part.get('body').get('attachmentId'):
372 # For now just store the attachment details
373 attachments.append({
374 'filename': part['filename'],
375 'mimeType': part['mimeType'],
376 'attachmentId': part['body']['attachmentId']
377 })
378 else:
379 logger.debug(f"Unhandled mimeType: {part['mimeType']}")
381 return body
383 def _parse_message(self, data, message, exception):
384 if exception:
385 logger.error(f'Exception in getting full email: {exception}')
386 return
388 payload = message['payload']
389 headers = payload.get("headers", [])
390 parts = payload.get("parts")
392 row = {
393 'id': message['id'],
394 'thread_id': message['threadId'],
395 'label_ids': message.get('labelIds', []),
396 'snippet': message.get('snippet', ''),
397 'history_id': message['historyId'],
398 'size_estimate': message.get('sizeEstimate', 0),
399 }
401 for header in headers:
402 key = header['name'].lower()
403 value = header['value']
405 if key in ['to', 'subject', 'date']:
406 row[key] = value
407 elif key == 'from':
408 row['sender'] = value
409 elif key == 'message-id':
410 row['message_id'] = value
412 attachments = []
413 row['body'] = self._parse_parts(parts, attachments)
414 row['attachments'] = json.dumps(attachments)
415 data.append(row)
417 def _get_messages(self, data, messages):
418 batch_req = self.service.new_batch_http_request(
419 lambda id, response, exception: self._parse_message(data, response, exception))
420 for message in messages:
421 batch_req.add(self.service.users().messages().get(userId='me', id=message['id']))
423 batch_req.execute()
425 def get_attachments(self, result):
426 for index, email in result.iterrows():
427 attachments = json.loads(email['attachments'])
428 for attachment in attachments:
429 attachment_id = attachment['attachmentId']
430 filename = attachment['filename']
431 attachment_data = self.service.users().messages().attachments().get(
432 userId='me', messageId=email['id'], id=attachment_id).execute()
433 file_data = attachment_data['data']
434 file_data = file_data.replace('-', '+').replace('_', '/')
435 file_data = urlsafe_b64decode(file_data)
436 with open(filename, 'wb') as f:
437 f.write(file_data)
439 def _handle_list_messages_response(self, data, messages):
440 total_pages = len(messages) // self.max_batch_size
441 for page in range(total_pages):
442 self._get_messages(data, messages[page * self.max_batch_size:(page + 1) * self.max_batch_size])
444 # Get the remaining messsages, if any
445 if len(messages) % self.max_batch_size > 0:
446 self._get_messages(data, messages[total_pages * self.max_batch_size:])
448 def call_gmail_api(self, method_name: str = None, params: dict = None) -> pd.DataFrame:
449 """Call Gmail API and map the data to pandas DataFrame
450 Args:
451 method_name (str): method name
452 params (dict): query parameters
453 Returns:
454 DataFrame
455 """
456 service = self.connect()
457 if method_name == 'list_messages':
458 method = service.users().messages().list
459 elif method_name == 'send_message':
460 method = service.users().messages().send
461 elif method_name == "delete_message":
462 method = service.users().messages().trash
463 elif method_name == 'update_message':
464 method = service.users().messages().modify
465 else:
466 raise NotImplementedError(f'Unknown method_name: {method_name}')
468 left = None
469 count_results = None
470 if 'maxResults' in params:
471 count_results = params['maxResults']
473 params['userId'] = 'me'
475 data = []
476 limit_exec_time = time.time() + 60
478 while True:
479 if time.time() > limit_exec_time:
480 raise RuntimeError('Handler request timeout error')
482 if count_results is not None:
483 left = count_results - len(data)
484 if left == 0:
485 break
486 elif left < 0:
487 # got more results that we need
488 data = data[:left]
489 break
491 if left > self.max_page_size:
492 params['maxResults'] = self.max_page_size
493 else:
494 params['maxResults'] = left
496 logger.debug(f'Calling Gmail API: {method_name} with params ({params})')
498 resp = method(**params).execute()
500 if 'messages' in resp:
501 self._handle_list_messages_response(data, resp['messages'])
502 elif isinstance(resp, dict):
503 data.append(resp)
505 if count_results is not None and 'nextPageToken' in resp:
506 params['pageToken'] = resp['nextPageToken']
507 else:
508 break
510 df = pd.DataFrame(data)
512 return df