Coverage for mindsdb / integrations / handlers / email_handler / email_tables.py: 0%
75 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import datetime as dt
2import pytz
4import pandas as pd
6from mindsdb_sql_parser import ast
8from mindsdb.integrations.handlers.email_handler.email_ingestor import EmailIngestor
9from mindsdb.integrations.libs.api_handler import APITable
11from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor
12from mindsdb.integrations.utilities.handlers.query_utilities.insert_query_utilities import INSERTQueryParser
13from mindsdb.integrations.handlers.email_handler.settings import EmailSearchOptions
14from mindsdb.utilities import log
16logger = log.getLogger(__name__)
19class EmailsTable(APITable):
20 """The Emails Table implementation"""
22 def select(self, query: ast.Select) -> pd.DataFrame:
23 """Pulls email data from the connected account.
25 Parameters
26 ----------
27 query : ast.Select
28 Given SQL SELECT query
30 Returns
31 -------
32 pd.DataFrame
33 Emails matching the query
35 Raises
36 ------
37 ValueError
38 If the query contains an unsupported condition
39 """
41 select_statement_parser = SELECTQueryParser(
42 query,
43 'emails',
44 self.get_columns()
45 )
46 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
48 search_params = {}
49 for op, arg1, arg2 in where_conditions:
50 if arg2 is None:
51 logger.warning(f"Skipping condition: {arg1} {op} {arg2}."
52 "Please ignore if this is intentional, e.g. 'id > last' on first query of job run."
53 )
54 continue
56 if arg1 == 'datetime':
57 date = self.parse_date(arg2)
58 if op == '>':
59 search_params['since_date'] = date
60 elif op == '<':
61 search_params['until_date'] = date
62 else:
63 raise NotImplementedError("Only > and < operators are supported for created_at column.")
64 continue
66 elif arg1 == 'id':
67 if op not in ['=', '>', '>=']:
68 raise NotImplementedError("Only =, > and >= operators are supported for id column.")
69 if op in ['=', '>=']:
70 search_params['since_email_id'] = int(arg2)
71 elif op == '>':
72 search_params['since_email_id'] = int(arg2) + 1
74 elif arg1 in ['mailbox', 'subject', 'to_field', 'from_field']:
75 if op != '=':
76 raise NotImplementedError("Only = operator is supported for mailbox, subject, to and from columns.")
77 else:
78 if arg1 == 'from_field':
79 search_params['from_field'] = arg2
80 else:
81 search_params[arg1] = arg2
83 else:
84 raise NotImplementedError(f"Unsupported column: {arg1}.")
86 self.handler.connect()
88 if search_params:
89 search_options = EmailSearchOptions(**search_params)
90 else:
91 search_options = EmailSearchOptions()
93 email_ingestor = EmailIngestor(self.handler.connection, search_options)
95 emails_df = email_ingestor.ingest()
97 # ensure all queries from query are applied to the dataframe
98 select_statement_executor = SELECTQueryExecutor(
99 emails_df,
100 selected_columns,
101 [],
102 order_by_conditions,
103 result_limit
104 )
105 return select_statement_executor.execute_query()
107 def insert(self, query: ast.Insert) -> None:
108 """Sends emails through the connected account.
110 Parameters
111 ----------
112 query : ast.Insert
113 Given SQL INSERT query
115 Returns
116 -------
117 None
119 Raises
120 ------
121 ValueError
122 If the query contains an unsupported condition
123 """
124 insert_statement_parser = INSERTQueryParser(
125 query,
126 supported_columns=['to_field', 'subject', 'body'],
127 mandatory_columns=['to_field', 'subject', 'body'],
128 all_mandatory=True
129 )
130 email_data = insert_statement_parser.parse_query()
132 for email in email_data:
133 connection = self.handler.connect()
134 to_addr = email['to_field']
135 del email['to_field']
136 connection.send_email(to_addr, **email)
138 def get_columns(self):
139 return ['id', 'body', 'subject', 'to_field', 'from_field', 'datetime']
141 @staticmethod
142 def parse_date(date_str) -> dt.datetime:
144 if isinstance(date_str, dt.datetime):
146 return date_str
147 date_formats = ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d']
148 date = None
149 for date_format in date_formats:
150 try:
151 date = dt.datetime.strptime(date_str, date_format)
152 except ValueError:
153 pass
154 if date is None:
155 raise ValueError(f"Can't parse date: {date_str}")
156 date = date.astimezone(pytz.utc)
158 return date