Coverage for mindsdb / integrations / handlers / sendinblue_handler / sendinblue_handler.py: 0%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import sib_api_v3_sdk 

2 

3from mindsdb.integrations.handlers.sendinblue_handler.sendinblue_tables import EmailCampaignsTable 

4from mindsdb.integrations.libs.api_handler import APIHandler 

5from mindsdb.integrations.libs.response import ( 

6 HandlerStatusResponse as StatusResponse, 

7) 

8 

9from mindsdb.utilities import log 

10from mindsdb_sql_parser import parse_sql 

11 

12logger = log.getLogger(__name__) 

13 

14 

15class SendinblueHandler(APIHandler): 

16 """ 

17 The Sendinblue handler implementation. 

18 """ 

19 

20 name = 'sendinblue' 

21 

22 def __init__(self, name: str, **kwargs): 

23 """ 

24 Initialize the handler. 

25 Args: 

26 name (str): name of particular handler instance 

27 **kwargs: arbitrary keyword arguments. 

28 """ 

29 super().__init__(name) 

30 

31 connection_data = kwargs.get("connection_data", {}) 

32 self.connection_data = connection_data 

33 self.kwargs = kwargs 

34 

35 self.connection = None 

36 self.is_connected = False 

37 

38 email_campaigns_data = EmailCampaignsTable(self) 

39 self._register_table("email_campaigns", email_campaigns_data) 

40 

41 def connect(self): 

42 """ 

43 Set up the connection required by the handler. 

44 Returns 

45 ------- 

46 StatusResponse 

47 connection object 

48 """ 

49 if self.is_connected is True: 

50 return self.connection 

51 

52 configuration = sib_api_v3_sdk.Configuration() 

53 configuration.api_key['api-key'] = self.connection_data['api_key'] 

54 

55 self.connection = sib_api_v3_sdk.ApiClient(configuration) 

56 

57 self.is_connected = True 

58 

59 return self.connection 

60 

61 def check_connection(self) -> StatusResponse: 

62 """ 

63 Check connection to the handler. 

64 Returns: 

65 HandlerStatusResponse 

66 """ 

67 

68 response = StatusResponse(False) 

69 

70 try: 

71 connection = self.connect() 

72 api_instance = sib_api_v3_sdk.AccountApi(connection) 

73 api_instance.get_account() 

74 response.success = True 

75 except Exception as e: 

76 logger.error('Error connecting to Sendinblue!') 

77 response.error_message = str(e) 

78 

79 self.is_connected = response.success 

80 

81 return response 

82 

83 def native_query(self, query: str) -> StatusResponse: 

84 """Receive and process a raw query. 

85 Parameters 

86 ---------- 

87 query : str 

88 query in a native format 

89 Returns 

90 ------- 

91 StatusResponse 

92 Request status 

93 """ 

94 ast = parse_sql(query) 

95 return self.query(ast)