Coverage for mindsdb / integrations / handlers / eventbrite_handler / eventbrite_handler.py: 0%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2 

3from eventbrite.client import Client 

4from mindsdb.utilities import log 

5from mindsdb.utilities.config import Config 

6from mindsdb.integrations.libs.api_handler import APIHandler 

7from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse 

8 

9from .eventbrite_tables import EventbriteUserTable 

10from .eventbrite_tables import EventbriteOrganizationTable 

11from .eventbrite_tables import EventbriteCategoryTable 

12from .eventbrite_tables import EventbriteSubcategoryTable 

13from .eventbrite_tables import EventbriteFormatTable 

14from .eventbrite_tables import EventbriteEventDetailsTable 

15from .eventbrite_tables import EventbriteEventsTable 

16 

17logger = log.getLogger(__name__) 

18 

19 

20class EventbriteHandler(APIHandler): 

21 """A class for handling connections and interactions with the Eventbrite API. 

22 

23 Attributes: 

24 api (Client): The `Client` object for accessing Eventbrite API. 

25 """ 

26 

27 def __init__(self, name=None, **kwargs): 

28 super().__init__(name) 

29 

30 args = kwargs.get("connection_data", {}) 

31 self.connection_args = {} 

32 handler_config = Config().get("eventbrite_handler", {}) 

33 

34 # Set up connection arguments 

35 for k in ["access_token"]: 

36 if k in args: 

37 self.connection_args[k] = args[k] 

38 elif f"EVENTBRITE_{k.upper()}" in os.environ: 

39 self.connection_args[k] = os.environ[f"EVENTBRITE_{k.upper()}"] 

40 elif k in handler_config: 

41 self.connection_args[k] = handler_config[k] 

42 

43 self.api = None 

44 self.is_connected = False 

45 

46 userTable = EventbriteUserTable(self) 

47 self._register_table("user", userTable) 

48 

49 organizationTable = EventbriteOrganizationTable(self) 

50 self._register_table("organization", organizationTable) 

51 

52 categoryTable = EventbriteCategoryTable(self) 

53 self._register_table("category", categoryTable) 

54 

55 subcategoryTable = EventbriteSubcategoryTable(self) 

56 self._register_table("subcategory", subcategoryTable) 

57 

58 formatTable = EventbriteFormatTable(self) 

59 self._register_table("formats", formatTable) 

60 

61 eventDetailsTable = EventbriteEventDetailsTable(self) 

62 self._register_table("event_details", eventDetailsTable) 

63 

64 eventsTable = EventbriteEventsTable(self) 

65 self._register_table("events", eventsTable) 

66 

67 def connect(self): 

68 """Initialize the Eventbrite API Client.""" 

69 if self.is_connected: 

70 return self.api 

71 

72 self.api = Client(access_token=self.connection_args["access_token"]) 

73 

74 self.is_connected = True 

75 return self.api 

76 

77 def check_connection(self) -> StatusResponse: 

78 """Check the connection to the Eventbrite API.""" 

79 response = StatusResponse(False) 

80 

81 try: 

82 api = self.connect() 

83 me = api.get_current_user() 

84 logger.info(f"Connected to Eventbrite as {me['name']}") 

85 response.success = True 

86 

87 except Exception as e: 

88 response.error_message = f"Error connecting to Eventbrite API: {e}" 

89 logger.error(response.error_message) 

90 

91 if not response.success and self.is_connected: 

92 self.is_connected = False 

93 

94 return response