Coverage for mindsdb / integrations / handlers / gmail_handler / gmail_handler.py: 0%

274 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2 

3from mindsdb.integrations.libs.response import ( 

4 HandlerStatusResponse as StatusResponse, 

5 HandlerResponse as Response 

6) 

7 

8from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

9from mindsdb.integrations.libs.api_handler import APIHandler, APITable 

10from mindsdb_sql_parser import ast 

11from mindsdb.utilities import log 

12from mindsdb_sql_parser import parse_sql 

13from mindsdb.utilities.config import Config 

14 

15import time 

16from typing import List 

17import pandas as pd 

18 

19from googleapiclient.discovery import build 

20from googleapiclient.errors import HttpError 

21from email.message import EmailMessage 

22 

23from base64 import urlsafe_b64encode, urlsafe_b64decode 

24 

25from mindsdb.integrations.utilities.handlers.auth_utilities.google import GoogleUserOAuth2Manager 

26from mindsdb.integrations.utilities.handlers.auth_utilities.exceptions import AuthException 

27 

28DEFAULT_SCOPES = [ 

29 'https://www.googleapis.com/auth/gmail.compose', 

30 'https://www.googleapis.com/auth/gmail.readonly', 

31 'https://www.googleapis.com/auth/gmail.modify' 

32] 

33 

34logger = log.getLogger(__name__) 

35 

36 

37class EmailsTable(APITable): 

38 """Implementation for the emails table for Gmail""" 

39 

40 def select(self, query: ast.Select) -> pd.DataFrame: 

41 """Pulls emails from Gmail "users.messages.list" API 

42 

43 Parameters 

44 ---------- 

45 query : ast.Select 

46 Given SQL SELECT query 

47 

48 Returns 

49 ------- 

50 pd.DataFrame 

51 Email matching the query 

52 

53 Raises 

54 ------ 

55 NotImplementedError 

56 If the query contains an unsupported operation or condition 

57 """ 

58 

59 conditions = extract_comparison_conditions(query.where) 

60 

61 params = {} 

62 for op, arg1, arg2 in conditions: 

63 

64 if op == 'or': 

65 raise NotImplementedError('OR is not supported') 

66 

67 if arg1 in ['query', 'label_ids', 'include_spam_trash']: 

68 if op == '=': 

69 if arg1 == 'query': 

70 params['q'] = arg2 

71 elif arg1 == 'label_ids': 

72 params['labelIds'] = arg2.split(',') 

73 else: 

74 params['includeSpamTrash'] = arg2 

75 else: 

76 raise NotImplementedError(f'Unknown op: {op}') 

77 

78 else: 

79 raise NotImplementedError(f'Unknown clause: {arg1}') 

80 

81 if query.limit is not None: 

82 params['maxResults'] = query.limit.value 

83 

84 result = self.handler.call_gmail_api( 

85 method_name='list_messages', 

86 params=params 

87 ) 

88 # filter targets 

89 columns = [] 

90 for target in query.targets: 

91 if isinstance(target, ast.Star): 

92 columns = self.get_columns() 

93 break 

94 elif isinstance(target, ast.Identifier): 

95 columns.append(target.parts[-1]) 

96 else: 

97 raise NotImplementedError(f"Unknown query target {type(target)}") 

98 

99 # columns to lower case 

100 columns = [name.lower() for name in columns] 

101 

102 if len(result) == 0: 

103 return pd.DataFrame([], columns=columns) 

104 

105 # add absent columns 

106 for col in set(columns) & set(result.columns) ^ set(columns): 

107 result[col] = None 

108 

109 # filter by columns 

110 result = result[columns] 

111 

112 # Rename columns 

113 for target in query.targets: 

114 if target.alias: 

115 result.rename(columns={target.parts[-1]: str(target.alias)}, inplace=True) 

116 

117 return result 

118 

119 def get_columns(self) -> List[str]: 

120 """Gets all columns to be returned in pandas DataFrame responses 

121 

122 Returns 

123 ------- 

124 List[str] 

125 List of columns 

126 """ 

127 return [ 

128 'id', 

129 'message_id', 

130 'thread_id', 

131 'label_ids', 

132 'sender', 

133 'to', 

134 'date', 

135 'subject', 

136 'snippet', 

137 'history_id', 

138 'size_estimate', 

139 'body', 

140 'attachments', 

141 ] 

142 

143 def insert(self, query: ast.Insert): 

144 """Sends reply emails using the Gmail "users.messages.send" API 

145 

146 Parameters 

147 ---------- 

148 query : ast.Insert 

149 Given SQL INSERT query 

150 

151 Raises 

152 ------ 

153 ValueError 

154 If the query contains an unsupported condition 

155 """ 

156 columns = [col.name for col in query.columns] 

157 

158 supported_columns = {"message_id", "thread_id", "to_email", "subject", "body"} 

159 if not set(columns).issubset(supported_columns): 

160 unsupported_columns = set(columns).difference(supported_columns) 

161 raise ValueError( 

162 "Unsupported columns for create email: " 

163 + ", ".join(unsupported_columns) 

164 ) 

165 

166 for row in query.values: 

167 params = dict(zip(columns, row)) 

168 

169 if 'to_email' not in params: 

170 raise ValueError('"to_email" parameter is required to send an email') 

171 

172 message = EmailMessage() 

173 message['To'] = params['to_email'] 

174 message['Subject'] = params['subject'] if 'subject' in params else '' 

175 

176 content = params['body'] if 'body' in params else '' 

177 message.set_content(content) 

178 

179 # If threadId is present then add References and In-Reply-To headers 

180 # so that proper threading can happen 

181 if 'thread_id' in params and 'message_id' in params: 

182 message['In-Reply-To'] = params['message_id'] 

183 message['References'] = params['message_id'] 

184 

185 encoded_message = urlsafe_b64encode(message.as_bytes()).decode() 

186 

187 message = { 

188 'raw': encoded_message 

189 } 

190 

191 if 'thread_id' in params: 

192 message['threadId'] = params['thread_id'] 

193 self.handler.call_gmail_api('send_message', {'body': message}) 

194 

195 def delete(self, query: ast.Delete): 

196 """ 

197 Deletes an event or events in the calendar. 

198 

199 Args: 

200 query (ast.Delete): SQL query to parse. 

201 

202 Returns: 

203 Response: Response object containing the results. 

204 """ 

205 

206 # Parse the query to get the conditions. 

207 conditions = extract_comparison_conditions(query.where) 

208 for op, arg1, arg2 in conditions: 

209 if op == 'or': 

210 raise NotImplementedError('OR is not supported') 

211 if arg1 == 'message_id': 

212 if op == '=': 

213 self.handler.call_gmail_api('delete_message', {'id': arg2}) 

214 else: 

215 raise NotImplementedError(f'Unknown op: {op}') 

216 else: 

217 raise NotImplementedError(f'Unknown clause: {arg1}') 

218 

219 def update(self, query: ast.Update) -> None: 

220 """Updates a label of a message. 

221 

222 Args: 

223 query (ASTNode): The SQL query to parse. 

224 

225 Raises: 

226 NotImplementedError: If the query contains an unsupported condition. 

227 """ 

228 params = {} 

229 conditions = extract_comparison_conditions(query.where) 

230 for op, arg1, arg2 in conditions: 

231 if op == 'or': 

232 raise NotImplementedError('OR is not supported') 

233 if arg1 == 'id': 

234 if op == '=': 

235 params['id'] = arg2 

236 else: 

237 raise NotImplementedError(f'Unknown op: {op}') 

238 else: 

239 raise NotImplementedError(f'Unknown clause: {arg1}') 

240 request_body = {} 

241 values = query.update_columns.items() 

242 data_list = list(values) 

243 add_label = [] 

244 remove_label = [] 

245 for key, value in data_list: 

246 if key == 'addLabel': 

247 add_label.append(str(value)[1:-1]) 

248 elif key == 'removeLabel': 

249 remove_label.append(str(value)[1:-1]) 

250 else: 

251 raise NotImplementedError(f'Unknown clause: {key}') 

252 if add_label: 

253 request_body['addLabelIds'] = add_label 

254 if remove_label: 

255 request_body['removeLabelIds'] = remove_label 

256 params['body'] = request_body 

257 self.handler.call_gmail_api('update_message', params) 

258 

259 

260class GmailHandler(APIHandler): 

261 """A class for handling connections and interactions with the Gmail API. 

262 

263 Attributes: 

264 credentials_file (str): The path to the Google Auth Credentials file for authentication 

265 and interacting with the Gmail API on behalf of the uesr. 

266 

267 scopes (List[str], Optional): The scopes to use when authenticating with the Gmail API. 

268 """ 

269 

270 def __init__(self, name=None, **kwargs): 

271 super().__init__(name) 

272 self.connection_args = kwargs.get('connection_data', {}) 

273 

274 self.token_file = None 

275 self.max_page_size = 500 

276 self.max_batch_size = 100 

277 self.service = None 

278 self.is_connected = False 

279 

280 self.handler_storage = kwargs['handler_storage'] 

281 

282 self.credentials_url = self.connection_args.get('credentials_url', None) 

283 self.credentials_file = self.connection_args.get('credentials_file', None) 

284 if self.connection_args.get('credentials'): 

285 self.credentials_file = self.connection_args.pop('credentials') 

286 if not self.credentials_file and not self.credentials_url: 

287 # try to get from config 

288 gm_config = Config().get('handlers', {}).get('gmail', {}) 

289 secret_file = gm_config.get('credentials_file') 

290 secret_url = gm_config.get('credentials_url') 

291 if secret_file: 

292 self.credentials_file = secret_file 

293 elif secret_url: 

294 self.credentials_url = secret_url 

295 

296 self.scopes = self.connection_args.get('scopes', DEFAULT_SCOPES) 

297 

298 emails = EmailsTable(self) 

299 self.emails = emails 

300 self._register_table('emails', emails) 

301 

302 def connect(self): 

303 """Authenticate with the Gmail API using the credentials file. 

304 

305 Returns 

306 ------- 

307 service: object 

308 The authenticated Gmail API service object. 

309 """ 

310 if self.is_connected and self.service is not None: 

311 return self.service 

312 

313 google_oauth2_manager = GoogleUserOAuth2Manager(self.handler_storage, self.scopes, self.credentials_file, self.credentials_url, self.connection_args.get('code')) 

314 creds = google_oauth2_manager.get_oauth2_credentials() 

315 

316 self.service = build('gmail', 'v1', credentials=creds) 

317 

318 self.is_connected = True 

319 return self.service 

320 

321 def check_connection(self) -> StatusResponse: 

322 """Check connection to the handler. 

323 

324 Returns 

325 ------- 

326 StatusResponse 

327 Status confirmation 

328 """ 

329 response = StatusResponse(False) 

330 

331 try: 

332 # Call the Gmail API 

333 service = self.connect() 

334 

335 result = service.users().getProfile(userId='me').execute() 

336 

337 if result and result.get('emailAddress', None) is not None: 

338 response.success = True 

339 response.copy_storage = True 

340 except AuthException as error: 

341 response.error_message = str(error) 

342 response.redirect_url = error.auth_url 

343 return response 

344 

345 except HttpError as error: 

346 response.error_message = f'Error connecting to Gmail api: {error}.' 

347 logger.error(response.error_message) 

348 

349 if response.success is False and self.is_connected is True: 

350 self.is_connected = False 

351 

352 return response 

353 

354 def native_query(self, query_string: str = None) -> Response: 

355 ast = parse_sql(query_string) 

356 

357 return self.query(ast) 

358 

359 def _parse_parts(self, parts, attachments): 

360 if not parts: 

361 return '' 

362 

363 body = '' 

364 for part in parts: 

365 if part['mimeType'] == 'text/plain': 

366 part_body = part.get('body', {}).get('data', '') 

367 body += urlsafe_b64decode(part_body).decode('utf-8') 

368 elif part['mimeType'] == 'multipart/alternative' or 'parts' in part: 

369 # Recursively iterate over nested parts to find the plain text body 

370 body += self._parse_parts(part['parts'], attachments) 

371 elif part.get('filename') and part.get('body') and part.get('body').get('attachmentId'): 

372 # For now just store the attachment details 

373 attachments.append({ 

374 'filename': part['filename'], 

375 'mimeType': part['mimeType'], 

376 'attachmentId': part['body']['attachmentId'] 

377 }) 

378 else: 

379 logger.debug(f"Unhandled mimeType: {part['mimeType']}") 

380 

381 return body 

382 

383 def _parse_message(self, data, message, exception): 

384 if exception: 

385 logger.error(f'Exception in getting full email: {exception}') 

386 return 

387 

388 payload = message['payload'] 

389 headers = payload.get("headers", []) 

390 parts = payload.get("parts") 

391 

392 row = { 

393 'id': message['id'], 

394 'thread_id': message['threadId'], 

395 'label_ids': message.get('labelIds', []), 

396 'snippet': message.get('snippet', ''), 

397 'history_id': message['historyId'], 

398 'size_estimate': message.get('sizeEstimate', 0), 

399 } 

400 

401 for header in headers: 

402 key = header['name'].lower() 

403 value = header['value'] 

404 

405 if key in ['to', 'subject', 'date']: 

406 row[key] = value 

407 elif key == 'from': 

408 row['sender'] = value 

409 elif key == 'message-id': 

410 row['message_id'] = value 

411 

412 attachments = [] 

413 row['body'] = self._parse_parts(parts, attachments) 

414 row['attachments'] = json.dumps(attachments) 

415 data.append(row) 

416 

417 def _get_messages(self, data, messages): 

418 batch_req = self.service.new_batch_http_request( 

419 lambda id, response, exception: self._parse_message(data, response, exception)) 

420 for message in messages: 

421 batch_req.add(self.service.users().messages().get(userId='me', id=message['id'])) 

422 

423 batch_req.execute() 

424 

425 def get_attachments(self, result): 

426 for index, email in result.iterrows(): 

427 attachments = json.loads(email['attachments']) 

428 for attachment in attachments: 

429 attachment_id = attachment['attachmentId'] 

430 filename = attachment['filename'] 

431 attachment_data = self.service.users().messages().attachments().get( 

432 userId='me', messageId=email['id'], id=attachment_id).execute() 

433 file_data = attachment_data['data'] 

434 file_data = file_data.replace('-', '+').replace('_', '/') 

435 file_data = urlsafe_b64decode(file_data) 

436 with open(filename, 'wb') as f: 

437 f.write(file_data) 

438 

439 def _handle_list_messages_response(self, data, messages): 

440 total_pages = len(messages) // self.max_batch_size 

441 for page in range(total_pages): 

442 self._get_messages(data, messages[page * self.max_batch_size:(page + 1) * self.max_batch_size]) 

443 

444 # Get the remaining messsages, if any 

445 if len(messages) % self.max_batch_size > 0: 

446 self._get_messages(data, messages[total_pages * self.max_batch_size:]) 

447 

448 def call_gmail_api(self, method_name: str = None, params: dict = None) -> pd.DataFrame: 

449 """Call Gmail API and map the data to pandas DataFrame 

450 Args: 

451 method_name (str): method name 

452 params (dict): query parameters 

453 Returns: 

454 DataFrame 

455 """ 

456 service = self.connect() 

457 if method_name == 'list_messages': 

458 method = service.users().messages().list 

459 elif method_name == 'send_message': 

460 method = service.users().messages().send 

461 elif method_name == "delete_message": 

462 method = service.users().messages().trash 

463 elif method_name == 'update_message': 

464 method = service.users().messages().modify 

465 else: 

466 raise NotImplementedError(f'Unknown method_name: {method_name}') 

467 

468 left = None 

469 count_results = None 

470 if 'maxResults' in params: 

471 count_results = params['maxResults'] 

472 

473 params['userId'] = 'me' 

474 

475 data = [] 

476 limit_exec_time = time.time() + 60 

477 

478 while True: 

479 if time.time() > limit_exec_time: 

480 raise RuntimeError('Handler request timeout error') 

481 

482 if count_results is not None: 

483 left = count_results - len(data) 

484 if left == 0: 

485 break 

486 elif left < 0: 

487 # got more results that we need 

488 data = data[:left] 

489 break 

490 

491 if left > self.max_page_size: 

492 params['maxResults'] = self.max_page_size 

493 else: 

494 params['maxResults'] = left 

495 

496 logger.debug(f'Calling Gmail API: {method_name} with params ({params})') 

497 

498 resp = method(**params).execute() 

499 

500 if 'messages' in resp: 

501 self._handle_list_messages_response(data, resp['messages']) 

502 elif isinstance(resp, dict): 

503 data.append(resp) 

504 

505 if count_results is not None and 'nextPageToken' in resp: 

506 params['pageToken'] = resp['nextPageToken'] 

507 else: 

508 break 

509 

510 df = pd.DataFrame(data) 

511 

512 return df