Coverage for mindsdb / integrations / handlers / email_handler / email_tables.py: 0%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import datetime as dt 

2import pytz 

3 

4import pandas as pd 

5 

6from mindsdb_sql_parser import ast 

7 

8from mindsdb.integrations.handlers.email_handler.email_ingestor import EmailIngestor 

9from mindsdb.integrations.libs.api_handler import APITable 

10 

11from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor 

12from mindsdb.integrations.utilities.handlers.query_utilities.insert_query_utilities import INSERTQueryParser 

13from mindsdb.integrations.handlers.email_handler.settings import EmailSearchOptions 

14from mindsdb.utilities import log 

15 

16logger = log.getLogger(__name__) 

17 

18 

19class EmailsTable(APITable): 

20 """The Emails Table implementation""" 

21 

22 def select(self, query: ast.Select) -> pd.DataFrame: 

23 """Pulls email data from the connected account. 

24 

25 Parameters 

26 ---------- 

27 query : ast.Select 

28 Given SQL SELECT query 

29 

30 Returns 

31 ------- 

32 pd.DataFrame 

33 Emails matching the query 

34 

35 Raises 

36 ------ 

37 ValueError 

38 If the query contains an unsupported condition 

39 """ 

40 

41 select_statement_parser = SELECTQueryParser( 

42 query, 

43 'emails', 

44 self.get_columns() 

45 ) 

46 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

47 

48 search_params = {} 

49 for op, arg1, arg2 in where_conditions: 

50 if arg2 is None: 

51 logger.warning(f"Skipping condition: {arg1} {op} {arg2}." 

52 "Please ignore if this is intentional, e.g. 'id > last' on first query of job run." 

53 ) 

54 continue 

55 

56 if arg1 == 'datetime': 

57 date = self.parse_date(arg2) 

58 if op == '>': 

59 search_params['since_date'] = date 

60 elif op == '<': 

61 search_params['until_date'] = date 

62 else: 

63 raise NotImplementedError("Only > and < operators are supported for created_at column.") 

64 continue 

65 

66 elif arg1 == 'id': 

67 if op not in ['=', '>', '>=']: 

68 raise NotImplementedError("Only =, > and >= operators are supported for id column.") 

69 if op in ['=', '>=']: 

70 search_params['since_email_id'] = int(arg2) 

71 elif op == '>': 

72 search_params['since_email_id'] = int(arg2) + 1 

73 

74 elif arg1 in ['mailbox', 'subject', 'to_field', 'from_field']: 

75 if op != '=': 

76 raise NotImplementedError("Only = operator is supported for mailbox, subject, to and from columns.") 

77 else: 

78 if arg1 == 'from_field': 

79 search_params['from_field'] = arg2 

80 else: 

81 search_params[arg1] = arg2 

82 

83 else: 

84 raise NotImplementedError(f"Unsupported column: {arg1}.") 

85 

86 self.handler.connect() 

87 

88 if search_params: 

89 search_options = EmailSearchOptions(**search_params) 

90 else: 

91 search_options = EmailSearchOptions() 

92 

93 email_ingestor = EmailIngestor(self.handler.connection, search_options) 

94 

95 emails_df = email_ingestor.ingest() 

96 

97 # ensure all queries from query are applied to the dataframe 

98 select_statement_executor = SELECTQueryExecutor( 

99 emails_df, 

100 selected_columns, 

101 [], 

102 order_by_conditions, 

103 result_limit 

104 ) 

105 return select_statement_executor.execute_query() 

106 

107 def insert(self, query: ast.Insert) -> None: 

108 """Sends emails through the connected account. 

109 

110 Parameters 

111 ---------- 

112 query : ast.Insert 

113 Given SQL INSERT query 

114 

115 Returns 

116 ------- 

117 None 

118 

119 Raises 

120 ------ 

121 ValueError 

122 If the query contains an unsupported condition 

123 """ 

124 insert_statement_parser = INSERTQueryParser( 

125 query, 

126 supported_columns=['to_field', 'subject', 'body'], 

127 mandatory_columns=['to_field', 'subject', 'body'], 

128 all_mandatory=True 

129 ) 

130 email_data = insert_statement_parser.parse_query() 

131 

132 for email in email_data: 

133 connection = self.handler.connect() 

134 to_addr = email['to_field'] 

135 del email['to_field'] 

136 connection.send_email(to_addr, **email) 

137 

138 def get_columns(self): 

139 return ['id', 'body', 'subject', 'to_field', 'from_field', 'datetime'] 

140 

141 @staticmethod 

142 def parse_date(date_str) -> dt.datetime: 

143 

144 if isinstance(date_str, dt.datetime): 

145 

146 return date_str 

147 date_formats = ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d'] 

148 date = None 

149 for date_format in date_formats: 

150 try: 

151 date = dt.datetime.strptime(date_str, date_format) 

152 except ValueError: 

153 pass 

154 if date is None: 

155 raise ValueError(f"Can't parse date: {date_str}") 

156 date = date.astimezone(pytz.utc) 

157 

158 return date