Coverage for mindsdb / integrations / handlers / sendinblue_handler / sendinblue_handler.py: 0%
41 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import sib_api_v3_sdk
3from mindsdb.integrations.handlers.sendinblue_handler.sendinblue_tables import EmailCampaignsTable
4from mindsdb.integrations.libs.api_handler import APIHandler
5from mindsdb.integrations.libs.response import (
6 HandlerStatusResponse as StatusResponse,
7)
9from mindsdb.utilities import log
10from mindsdb_sql_parser import parse_sql
12logger = log.getLogger(__name__)
15class SendinblueHandler(APIHandler):
16 """
17 The Sendinblue handler implementation.
18 """
20 name = 'sendinblue'
22 def __init__(self, name: str, **kwargs):
23 """
24 Initialize the handler.
25 Args:
26 name (str): name of particular handler instance
27 **kwargs: arbitrary keyword arguments.
28 """
29 super().__init__(name)
31 connection_data = kwargs.get("connection_data", {})
32 self.connection_data = connection_data
33 self.kwargs = kwargs
35 self.connection = None
36 self.is_connected = False
38 email_campaigns_data = EmailCampaignsTable(self)
39 self._register_table("email_campaigns", email_campaigns_data)
41 def connect(self):
42 """
43 Set up the connection required by the handler.
44 Returns
45 -------
46 StatusResponse
47 connection object
48 """
49 if self.is_connected is True:
50 return self.connection
52 configuration = sib_api_v3_sdk.Configuration()
53 configuration.api_key['api-key'] = self.connection_data['api_key']
55 self.connection = sib_api_v3_sdk.ApiClient(configuration)
57 self.is_connected = True
59 return self.connection
61 def check_connection(self) -> StatusResponse:
62 """
63 Check connection to the handler.
64 Returns:
65 HandlerStatusResponse
66 """
68 response = StatusResponse(False)
70 try:
71 connection = self.connect()
72 api_instance = sib_api_v3_sdk.AccountApi(connection)
73 api_instance.get_account()
74 response.success = True
75 except Exception as e:
76 logger.error('Error connecting to Sendinblue!')
77 response.error_message = str(e)
79 self.is_connected = response.success
81 return response
83 def native_query(self, query: str) -> StatusResponse:
84 """Receive and process a raw query.
85 Parameters
86 ----------
87 query : str
88 query in a native format
89 Returns
90 -------
91 StatusResponse
92 Request status
93 """
94 ast = parse_sql(query)
95 return self.query(ast)