Coverage for mindsdb / integrations / handlers / email_handler / email_client.py: 0%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import imaplib 

2import email 

3import smtplib 

4from email.mime.multipart import MIMEMultipart 

5from email.mime.text import MIMEText 

6 

7from datetime import datetime, timedelta 

8 

9import pandas as pd 

10from mindsdb.integrations.handlers.email_handler.settings import EmailSearchOptions, EmailConnectionDetails 

11from mindsdb.utilities import log 

12 

13logger = log.getLogger(__name__) 

14 

15 

16class EmailClient: 

17 '''Class for searching emails using IMAP (Internet Messaging Access Protocol)''' 

18 

19 _DEFAULT_SINCE_DAYS = 10 

20 

21 def __init__( 

22 self, 

23 connection_data: EmailConnectionDetails 

24 ): 

25 self.email = connection_data.email 

26 self.password = connection_data.password 

27 self.imap_server = imaplib.IMAP4_SSL(connection_data.imap_server) 

28 self.smtp_server = smtplib.SMTP(connection_data.smtp_server, connection_data.smtp_port) 

29 

30 def select_mailbox(self, mailbox: str = 'INBOX'): 

31 '''Logs in & selects a mailbox from IMAP server. Defaults to INBOX, which is the default inbox. 

32 

33 Parameters: 

34 mailbox (str): The name of the mailbox to select. 

35 ''' 

36 ok, resp = self.imap_server.login(self.email, self.password) 

37 if ok != 'OK': 

38 raise ValueError( 

39 f'Unable to login to mailbox {mailbox}. Please check your credentials: {str(resp)}') 

40 

41 logger.info(f'Logged in to mailbox {mailbox}') 

42 

43 ok, resp = self.imap_server.select(mailbox) 

44 if ok != 'OK': 

45 raise ValueError( 

46 f'Unable to select mailbox {mailbox}. Please check the mailbox name: {str(resp)}') 

47 

48 logger.info(f'Selected mailbox {mailbox}') 

49 

50 def logout(self): 

51 '''Shuts down the connection to the IMAP and SMTP server.''' 

52 

53 try: 

54 ok, resp = self.imap_server.logout() 

55 if ok != 'BYE': 

56 logger.error( 

57 f'Unable to logout of IMAP client: {str(resp)}') 

58 logger.info('Logged out of IMAP server') 

59 except Exception as e: 

60 logger.error( 

61 f'Exception occurred while logging out from IMAP server: {str(e)}') 

62 

63 try: 

64 self.smtp_server.quit() 

65 logger.info('Logged out of SMTP server') 

66 except Exception as e: 

67 logger.error( 

68 f'Exception occurred while logging out from SMTP server: {str(e)}') 

69 

70 def send_email(self, to_addr: str, subject: str, body: str): 

71 ''' 

72 Sends an email to the given address. 

73 

74 Parameters: 

75 to_addr (str): The email address to send the email to. 

76 subject (str): The subject of the email. 

77 body (str): The body of the email. 

78 ''' 

79 

80 msg = MIMEMultipart() 

81 msg['From'] = self.email 

82 msg['To'] = to_addr 

83 msg['Subject'] = subject 

84 msg.attach(MIMEText(body, 'plain')) 

85 

86 self.smtp_server.starttls() 

87 self.smtp_server.login(self.email, self.password) 

88 self.smtp_server.send_message(msg) 

89 logger.info(f'Email sent to {to_addr} with subject: {subject}') 

90 

91 def search_email(self, options: EmailSearchOptions) -> pd.DataFrame: 

92 '''Searches emails based on the given options and returns a DataFrame. 

93 

94 Parameters: 

95 options (EmailSearchOptions): Options to use when searching using IMAP. 

96 

97 Returns: 

98 df (pd.DataFrame): A dataframe of emails resulting from the search. 

99 ''' 

100 self.select_mailbox(options.mailbox) 

101 

102 try: 

103 

104 query_parts = [] 

105 if options.subject is not None: 

106 query_parts.append(f'(SUBJECT "{options.subject}")') 

107 

108 if options.to_field is not None: 

109 query_parts.append(f'(TO "{options.to_field}")') 

110 

111 if options.from_field is not None: 

112 query_parts.append(f'(FROM "{options.from_field}")') 

113 

114 if options.since_date is not None: 

115 since_date_str = options.since_date.strftime('%d-%b-%Y') 

116 else: 

117 since_date = datetime.today() - timedelta(days=EmailClient._DEFAULT_SINCE_DAYS) 

118 since_date_str = since_date.strftime('%d-%b-%Y') 

119 query_parts.append(f'(SINCE "{since_date_str}")') 

120 

121 if options.until_date is not None: 

122 until_date_str = options.until_date.strftime('%d-%b-%Y') 

123 query_parts.append(f'(BEFORE "{until_date_str}")') 

124 

125 if options.since_email_id is not None: 

126 query_parts.append(f'(UID {options.since_email_id}:*)') 

127 

128 query = ' '.join(query_parts) 

129 ret = [] 

130 _, items = self.imap_server.uid('search', None, query) 

131 items = items[0].split() 

132 for emailid in items: 

133 _, data = self.imap_server.uid('fetch', emailid, '(RFC822)') 

134 email_message = email.message_from_bytes(data[0][1]) 

135 

136 email_line = {} 

137 email_line['id'] = emailid.decode() 

138 email_line['to_field'] = email_message.get('To') 

139 email_line['from_field'] = email_message.get('From') 

140 email_line['subject'] = email_message.get('Subject') 

141 email_line['date'] = email_message.get('Date') 

142 

143 plain_payload = None 

144 html_payload = None 

145 content_type = 'html' 

146 for part in email_message.walk(): 

147 subtype = part.get_content_subtype() 

148 if subtype == 'plain': 

149 # Prioritize plain text payloads when present. 

150 plain_payload = part.get_payload(decode=True) 

151 content_type = 'plain' 

152 break 

153 if subtype == 'html': 

154 html_payload = part.get_payload(decode=True) 

155 body = plain_payload or html_payload 

156 if body is None: 

157 # Very rarely messages won't have plain text or html payloads. 

158 continue 

159 email_line['body'] = plain_payload or html_payload 

160 email_line['body_content_type'] = content_type 

161 ret.append(email_line) 

162 except Exception as e: 

163 raise Exception('Error searching email') from e 

164 

165 return pd.DataFrame(ret)