Coverage for mindsdb / integrations / handlers / sendinblue_handler / sendinblue_tables.py: 0%
96 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import sib_api_v3_sdk
2import pandas as pd
5from typing import List, Dict, Text, Any
6from mindsdb.utilities import log
7from mindsdb.integrations.libs.api_handler import APITable
9from mindsdb_sql_parser import ast
10from sib_api_v3_sdk.rest import ApiException
13from mindsdb.integrations.utilities.handlers.query_utilities import (
14 SELECTQueryParser,
15 SELECTQueryExecutor,
16 UPDATEQueryExecutor,
17 UPDATEQueryParser,
18 DELETEQueryParser,
19 DELETEQueryExecutor,
20 INSERTQueryParser,
21)
23logger = log.getLogger(__name__)
26class EmailCampaignsTable(APITable):
27 """The Sendinblue Email Campaigns Table implementation"""
29 def select(self, query: ast.Select) -> pd.DataFrame:
30 """Pulls data from the Sendinblue "GET /emailCampaigns" API endpoint.
32 Parameters
33 ----------
34 query : ast.Select
35 Given SQL SELECT query
37 Returns
38 -------
39 pd.DataFrame
40 Sendinblue Email Campaigns matching the query
42 Raises
43 ------
44 ValueError
45 If the query contains an unsupported condition
46 """
48 select_statement_parser = SELECTQueryParser(
49 query, 'email_campaigns', self.get_columns()
50 )
51 (
52 selected_columns,
53 where_conditions,
54 order_by_conditions,
55 result_limit,
56 ) = select_statement_parser.parse_query()
58 email_campaigns_df = pd.json_normalize(
59 self.get_email_campaigns(limit=result_limit)
60 )
62 select_statement_executor = SELECTQueryExecutor(
63 email_campaigns_df, selected_columns, where_conditions, order_by_conditions
64 )
65 email_campaigns_df = select_statement_executor.execute_query()
67 return email_campaigns_df
69 def get_columns(self) -> List[str]:
70 return pd.json_normalize(self.get_email_campaigns(limit=1)).columns.tolist()
72 def get_email_campaigns(self, **kwargs):
73 connection = self.handler.connect()
74 email_campaigns_api_instance = sib_api_v3_sdk.EmailCampaignsApi(connection)
75 email_campaigns = email_campaigns_api_instance.get_email_campaigns(**kwargs)
76 return [email_campaign for email_campaign in email_campaigns.campaigns]
78 def delete(self, query: ast.Delete) -> None:
79 """
80 Deletes an email campaign from Sendinblue.
82 Parameters
83 ----------
84 query : ast.Delete
85 Given SQL DELETE query
87 Returns
88 -------
89 None
91 Raises
92 ------
93 RuntimeError
94 If an error occurs when calling Sendinblue's API
95 """
96 # this parses the DELETE statement to extract where conditions
97 delete_statement_parser = DELETEQueryParser(query)
98 where_conditions = delete_statement_parser.parse_query()
99 # this retrieves the current list of email campaigns and normalize the data into a DataFrame
100 email_campaigns_df = pd.json_normalize(self.get_email_campaigns())
101 # this execute the delete query to filter out the campaigns to be deleted
102 delete_query_executor = DELETEQueryExecutor(
103 email_campaigns_df, where_conditions
104 )
105 # this gets the updated DataFrame after executing delete conditions
106 email_campaigns_df = delete_query_executor.execute_query()
107 campaign_ids = email_campaigns_df['id'].tolist()
108 self.delete_email_campaigns(campaign_ids)
110 def delete_email_campaigns(self, campaign_ids: List[Text]) -> None:
111 # this establish a connection to Sendinblue API
112 connection = self.handler.connect()
113 email_campaigns_api_instance = sib_api_v3_sdk.EmailCampaignsApi(connection)
114 for campaign_id in campaign_ids:
115 try:
116 email_campaigns_api_instance.delete_email_campaign(campaign_id)
117 logger.info(f'Email Campaign {campaign_id} deleted')
118 except ApiException as e:
119 logger.error(
120 f"Exception when calling EmailCampaignsApi->delete_email_campaign: {e}\n"
121 )
122 raise RuntimeError(
123 f"Failed to execute the delete command for Email Campaign {campaign_id}"
124 ) from e
126 def update(self, query: 'ast.Update') -> None:
127 """
128 Updates data in Sendinblue "PUT /emailCampaigns/{campaignId}" API endpoint.
130 Parameters
131 ----------
132 query : ast.Update
133 Given SQL UPDATE query
135 Returns
136 -------
137 None
139 Raises
140 ------
141 ValueError
142 If the query contains an unsupported condition
143 RuntimeError
144 If an error occurs when calling Sendinblue's API
145 """
146 # this parse the UPDATE statement to extract the new values and the conditions for the update
147 update_statement_parser = UPDATEQueryParser(query)
148 values_to_update, where_conditions = update_statement_parser.parse_query()
150 email_campaigns_df = pd.json_normalize(self.get_email_campaigns())
151 update_query_executor = UPDATEQueryExecutor(
152 email_campaigns_df, where_conditions
153 )
154 # this retrieves the current list of email campaigns
155 email_campaigns_df = update_query_executor.execute_query()
156 # this extracts the IDs of the campaigns that have been updated
157 campaign_ids = email_campaigns_df['id'].tolist()
159 self.update_email_campaigns(campaign_ids, values_to_update)
161 def update_email_campaigns(
162 self, campaign_ids: List[int], values_to_update: Dict
163 ) -> None:
164 # this establish a connection to Sendinblue API
166 connection = self.handler.connect()
167 email_campaigns_api_instance = sib_api_v3_sdk.EmailCampaignsApi(connection)
169 for campaign_id in campaign_ids:
170 try:
171 email_campaigns_api_instance.update_email_campaign(
172 campaign_id, values_to_update
173 )
174 logger.info(f'Email Campaign {campaign_id} updated')
175 except ApiException as e:
176 logger.error(
177 f"Exception when calling EmailCampaignsApi->update_email_campaign: {e}\n"
178 )
179 raise RuntimeError(
180 f"Failed to execute the update command for Email Campaign {campaign_id}"
181 ) from e
183 def insert(self, query: 'ast.Insert') -> None:
184 """
185 Inserts new email campaigns into Sendinblue.
187 Parameters
188 ----------
189 query : ast.Insert
190 The SQL INSERT query to be parsed and executed.
192 Raises
193 ------
194 ValueError
195 If the necessary sender information is incomplete or incorrectly formatted.
196 Exception
197 For any unexpected errors during the email campaign creation.
198 """
199 # this defines columns that are supported and mandatory for an INSERT operation.
200 supported_columns = [
201 'name', 'subject', 'sender_name', 'sender_email',
202 'html_content', 'scheduled_at', 'recipients_lists', 'tag'
203 ]
204 mandatory_columns = ['name', 'subject', 'sender_name', 'sender_email', 'html_content']
206 # this Parse the INSERT query to extract data.
207 insert_statement_parser = INSERTQueryParser(
208 query, supported_columns=supported_columns,
209 mandatory_columns=mandatory_columns, all_mandatory=True
210 )
211 email_campaigns_data = insert_statement_parser.parse_query()
213 # this processes each campaign data extracted from the query.
214 for email_campaign_data in email_campaigns_data:
215 # this extracts and format sender information.
216 sender_info = {}
217 if 'sender_name' in email_campaign_data:
218 sender_info['name'] = email_campaign_data.pop('sender_name')
219 if 'sender_email' in email_campaign_data and email_campaign_data['sender_email'] is not None:
220 sender_info['email'] = email_campaign_data.pop('sender_email')
221 if 'sender_id' in email_campaign_data and email_campaign_data['sender_id'] is not None:
222 sender_info['id'] = email_campaign_data.pop('sender_id')
224 # this validates sender information.
225 if not sender_info.get('name') or (not sender_info.get('email') and not sender_info.get('id')):
226 raise ValueError("Sender information is incomplete or incorrectly formatted.")
228 email_campaign_data['sender'] = sender_info
230 # this creates each email campaign.
231 self.create_email_campaign(email_campaign_data)
233 def create_email_campaign(self, email_campaign_data: Dict[str, Any]) -> None:
234 """
235 Creates a new email campaign in Sendinblue.
237 Parameters
238 ----------
239 email_campaign_data : Dict[str, Any]
240 The data for the email campaign to be created.
242 Raises
243 ------
244 Exception
245 For any errors during the email campaign creation process.
246 """
247 # this establish a connection to the Sendinblue API.
248 api_session = self.handler.connect()
249 email_campaigns_api_instance = sib_api_v3_sdk.EmailCampaignsApi(api_session)
251 # this logs the data for the email campaign being created.
252 logger.info(f"Email campaign data before creating the object: {email_campaign_data}")
254 try:
255 # this creates the email campaign object and send it to Sendinblue.
256 email_campaign = sib_api_v3_sdk.CreateEmailCampaign(**email_campaign_data)
257 logger.info(f"Email campaign object after creation: {email_campaign}")
259 # this executes the API call to create the campaign.
260 created_campaign = email_campaigns_api_instance.create_email_campaign(email_campaign)
262 # this checks and log the response from the API.
263 if 'id' not in created_campaign.to_dict():
264 logger.error('Email campaign creation failed')
265 else:
266 logger.info(f'Email Campaign {created_campaign.to_dict()["id"]} created')
267 except ApiException as e:
268 # this handles API exceptions and log the detailed response.
269 logger.error(f"Exception when calling EmailCampaignsApi->create_email_campaign: {e}")
270 if hasattr(e, 'body'):
271 logger.error(f"Sendinblue API response body: {e.body}")
272 raise Exception(f'Failed to create Email Campaign with data: {email_campaign_data}') from e
273 except Exception as e:
274 # this handles any other unexpected exceptions.
275 logger.error(f"Unexpected error occurred: {e}")
276 raise Exception(f'Unexpected error during Email Campaign creation: {e}') from e