Coverage for mindsdb / integrations / handlers / sendinblue_handler / sendinblue_tables.py: 0%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import sib_api_v3_sdk 

2import pandas as pd 

3 

4 

5from typing import List, Dict, Text, Any 

6from mindsdb.utilities import log 

7from mindsdb.integrations.libs.api_handler import APITable 

8 

9from mindsdb_sql_parser import ast 

10from sib_api_v3_sdk.rest import ApiException 

11 

12 

13from mindsdb.integrations.utilities.handlers.query_utilities import ( 

14 SELECTQueryParser, 

15 SELECTQueryExecutor, 

16 UPDATEQueryExecutor, 

17 UPDATEQueryParser, 

18 DELETEQueryParser, 

19 DELETEQueryExecutor, 

20 INSERTQueryParser, 

21) 

22 

23logger = log.getLogger(__name__) 

24 

25 

26class EmailCampaignsTable(APITable): 

27 """The Sendinblue Email Campaigns Table implementation""" 

28 

29 def select(self, query: ast.Select) -> pd.DataFrame: 

30 """Pulls data from the Sendinblue "GET /emailCampaigns" API endpoint. 

31 

32 Parameters 

33 ---------- 

34 query : ast.Select 

35 Given SQL SELECT query 

36 

37 Returns 

38 ------- 

39 pd.DataFrame 

40 Sendinblue Email Campaigns matching the query 

41 

42 Raises 

43 ------ 

44 ValueError 

45 If the query contains an unsupported condition 

46 """ 

47 

48 select_statement_parser = SELECTQueryParser( 

49 query, 'email_campaigns', self.get_columns() 

50 ) 

51 ( 

52 selected_columns, 

53 where_conditions, 

54 order_by_conditions, 

55 result_limit, 

56 ) = select_statement_parser.parse_query() 

57 

58 email_campaigns_df = pd.json_normalize( 

59 self.get_email_campaigns(limit=result_limit) 

60 ) 

61 

62 select_statement_executor = SELECTQueryExecutor( 

63 email_campaigns_df, selected_columns, where_conditions, order_by_conditions 

64 ) 

65 email_campaigns_df = select_statement_executor.execute_query() 

66 

67 return email_campaigns_df 

68 

69 def get_columns(self) -> List[str]: 

70 return pd.json_normalize(self.get_email_campaigns(limit=1)).columns.tolist() 

71 

72 def get_email_campaigns(self, **kwargs): 

73 connection = self.handler.connect() 

74 email_campaigns_api_instance = sib_api_v3_sdk.EmailCampaignsApi(connection) 

75 email_campaigns = email_campaigns_api_instance.get_email_campaigns(**kwargs) 

76 return [email_campaign for email_campaign in email_campaigns.campaigns] 

77 

78 def delete(self, query: ast.Delete) -> None: 

79 """ 

80 Deletes an email campaign from Sendinblue. 

81 

82 Parameters 

83 ---------- 

84 query : ast.Delete 

85 Given SQL DELETE query 

86 

87 Returns 

88 ------- 

89 None 

90 

91 Raises 

92 ------ 

93 RuntimeError 

94 If an error occurs when calling Sendinblue's API 

95 """ 

96 # this parses the DELETE statement to extract where conditions 

97 delete_statement_parser = DELETEQueryParser(query) 

98 where_conditions = delete_statement_parser.parse_query() 

99 # this retrieves the current list of email campaigns and normalize the data into a DataFrame 

100 email_campaigns_df = pd.json_normalize(self.get_email_campaigns()) 

101 # this execute the delete query to filter out the campaigns to be deleted 

102 delete_query_executor = DELETEQueryExecutor( 

103 email_campaigns_df, where_conditions 

104 ) 

105 # this gets the updated DataFrame after executing delete conditions 

106 email_campaigns_df = delete_query_executor.execute_query() 

107 campaign_ids = email_campaigns_df['id'].tolist() 

108 self.delete_email_campaigns(campaign_ids) 

109 

110 def delete_email_campaigns(self, campaign_ids: List[Text]) -> None: 

111 # this establish a connection to Sendinblue API 

112 connection = self.handler.connect() 

113 email_campaigns_api_instance = sib_api_v3_sdk.EmailCampaignsApi(connection) 

114 for campaign_id in campaign_ids: 

115 try: 

116 email_campaigns_api_instance.delete_email_campaign(campaign_id) 

117 logger.info(f'Email Campaign {campaign_id} deleted') 

118 except ApiException as e: 

119 logger.error( 

120 f"Exception when calling EmailCampaignsApi->delete_email_campaign: {e}\n" 

121 ) 

122 raise RuntimeError( 

123 f"Failed to execute the delete command for Email Campaign {campaign_id}" 

124 ) from e 

125 

126 def update(self, query: 'ast.Update') -> None: 

127 """ 

128 Updates data in Sendinblue "PUT /emailCampaigns/{campaignId}" API endpoint. 

129 

130 Parameters 

131 ---------- 

132 query : ast.Update 

133 Given SQL UPDATE query 

134 

135 Returns 

136 ------- 

137 None 

138 

139 Raises 

140 ------ 

141 ValueError 

142 If the query contains an unsupported condition 

143 RuntimeError 

144 If an error occurs when calling Sendinblue's API 

145 """ 

146 # this parse the UPDATE statement to extract the new values and the conditions for the update 

147 update_statement_parser = UPDATEQueryParser(query) 

148 values_to_update, where_conditions = update_statement_parser.parse_query() 

149 

150 email_campaigns_df = pd.json_normalize(self.get_email_campaigns()) 

151 update_query_executor = UPDATEQueryExecutor( 

152 email_campaigns_df, where_conditions 

153 ) 

154 # this retrieves the current list of email campaigns 

155 email_campaigns_df = update_query_executor.execute_query() 

156 # this extracts the IDs of the campaigns that have been updated 

157 campaign_ids = email_campaigns_df['id'].tolist() 

158 

159 self.update_email_campaigns(campaign_ids, values_to_update) 

160 

161 def update_email_campaigns( 

162 self, campaign_ids: List[int], values_to_update: Dict 

163 ) -> None: 

164 # this establish a connection to Sendinblue API 

165 

166 connection = self.handler.connect() 

167 email_campaigns_api_instance = sib_api_v3_sdk.EmailCampaignsApi(connection) 

168 

169 for campaign_id in campaign_ids: 

170 try: 

171 email_campaigns_api_instance.update_email_campaign( 

172 campaign_id, values_to_update 

173 ) 

174 logger.info(f'Email Campaign {campaign_id} updated') 

175 except ApiException as e: 

176 logger.error( 

177 f"Exception when calling EmailCampaignsApi->update_email_campaign: {e}\n" 

178 ) 

179 raise RuntimeError( 

180 f"Failed to execute the update command for Email Campaign {campaign_id}" 

181 ) from e 

182 

183 def insert(self, query: 'ast.Insert') -> None: 

184 """ 

185 Inserts new email campaigns into Sendinblue. 

186 

187 Parameters 

188 ---------- 

189 query : ast.Insert 

190 The SQL INSERT query to be parsed and executed. 

191 

192 Raises 

193 ------ 

194 ValueError 

195 If the necessary sender information is incomplete or incorrectly formatted. 

196 Exception 

197 For any unexpected errors during the email campaign creation. 

198 """ 

199 # this defines columns that are supported and mandatory for an INSERT operation. 

200 supported_columns = [ 

201 'name', 'subject', 'sender_name', 'sender_email', 

202 'html_content', 'scheduled_at', 'recipients_lists', 'tag' 

203 ] 

204 mandatory_columns = ['name', 'subject', 'sender_name', 'sender_email', 'html_content'] 

205 

206 # this Parse the INSERT query to extract data. 

207 insert_statement_parser = INSERTQueryParser( 

208 query, supported_columns=supported_columns, 

209 mandatory_columns=mandatory_columns, all_mandatory=True 

210 ) 

211 email_campaigns_data = insert_statement_parser.parse_query() 

212 

213 # this processes each campaign data extracted from the query. 

214 for email_campaign_data in email_campaigns_data: 

215 # this extracts and format sender information. 

216 sender_info = {} 

217 if 'sender_name' in email_campaign_data: 

218 sender_info['name'] = email_campaign_data.pop('sender_name') 

219 if 'sender_email' in email_campaign_data and email_campaign_data['sender_email'] is not None: 

220 sender_info['email'] = email_campaign_data.pop('sender_email') 

221 if 'sender_id' in email_campaign_data and email_campaign_data['sender_id'] is not None: 

222 sender_info['id'] = email_campaign_data.pop('sender_id') 

223 

224 # this validates sender information. 

225 if not sender_info.get('name') or (not sender_info.get('email') and not sender_info.get('id')): 

226 raise ValueError("Sender information is incomplete or incorrectly formatted.") 

227 

228 email_campaign_data['sender'] = sender_info 

229 

230 # this creates each email campaign. 

231 self.create_email_campaign(email_campaign_data) 

232 

233 def create_email_campaign(self, email_campaign_data: Dict[str, Any]) -> None: 

234 """ 

235 Creates a new email campaign in Sendinblue. 

236 

237 Parameters 

238 ---------- 

239 email_campaign_data : Dict[str, Any] 

240 The data for the email campaign to be created. 

241 

242 Raises 

243 ------ 

244 Exception 

245 For any errors during the email campaign creation process. 

246 """ 

247 # this establish a connection to the Sendinblue API. 

248 api_session = self.handler.connect() 

249 email_campaigns_api_instance = sib_api_v3_sdk.EmailCampaignsApi(api_session) 

250 

251 # this logs the data for the email campaign being created. 

252 logger.info(f"Email campaign data before creating the object: {email_campaign_data}") 

253 

254 try: 

255 # this creates the email campaign object and send it to Sendinblue. 

256 email_campaign = sib_api_v3_sdk.CreateEmailCampaign(**email_campaign_data) 

257 logger.info(f"Email campaign object after creation: {email_campaign}") 

258 

259 # this executes the API call to create the campaign. 

260 created_campaign = email_campaigns_api_instance.create_email_campaign(email_campaign) 

261 

262 # this checks and log the response from the API. 

263 if 'id' not in created_campaign.to_dict(): 

264 logger.error('Email campaign creation failed') 

265 else: 

266 logger.info(f'Email Campaign {created_campaign.to_dict()["id"]} created') 

267 except ApiException as e: 

268 # this handles API exceptions and log the detailed response. 

269 logger.error(f"Exception when calling EmailCampaignsApi->create_email_campaign: {e}") 

270 if hasattr(e, 'body'): 

271 logger.error(f"Sendinblue API response body: {e.body}") 

272 raise Exception(f'Failed to create Email Campaign with data: {email_campaign_data}') from e 

273 except Exception as e: 

274 # this handles any other unexpected exceptions. 

275 logger.error(f"Unexpected error occurred: {e}") 

276 raise Exception(f'Unexpected error during Email Campaign creation: {e}') from e