Coverage for mindsdb / integrations / handlers / eventbrite_handler / eventbrite_handler.py: 0%
62 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
3from eventbrite.client import Client
4from mindsdb.utilities import log
5from mindsdb.utilities.config import Config
6from mindsdb.integrations.libs.api_handler import APIHandler
7from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse
9from .eventbrite_tables import EventbriteUserTable
10from .eventbrite_tables import EventbriteOrganizationTable
11from .eventbrite_tables import EventbriteCategoryTable
12from .eventbrite_tables import EventbriteSubcategoryTable
13from .eventbrite_tables import EventbriteFormatTable
14from .eventbrite_tables import EventbriteEventDetailsTable
15from .eventbrite_tables import EventbriteEventsTable
17logger = log.getLogger(__name__)
20class EventbriteHandler(APIHandler):
21 """A class for handling connections and interactions with the Eventbrite API.
23 Attributes:
24 api (Client): The `Client` object for accessing Eventbrite API.
25 """
27 def __init__(self, name=None, **kwargs):
28 super().__init__(name)
30 args = kwargs.get("connection_data", {})
31 self.connection_args = {}
32 handler_config = Config().get("eventbrite_handler", {})
34 # Set up connection arguments
35 for k in ["access_token"]:
36 if k in args:
37 self.connection_args[k] = args[k]
38 elif f"EVENTBRITE_{k.upper()}" in os.environ:
39 self.connection_args[k] = os.environ[f"EVENTBRITE_{k.upper()}"]
40 elif k in handler_config:
41 self.connection_args[k] = handler_config[k]
43 self.api = None
44 self.is_connected = False
46 userTable = EventbriteUserTable(self)
47 self._register_table("user", userTable)
49 organizationTable = EventbriteOrganizationTable(self)
50 self._register_table("organization", organizationTable)
52 categoryTable = EventbriteCategoryTable(self)
53 self._register_table("category", categoryTable)
55 subcategoryTable = EventbriteSubcategoryTable(self)
56 self._register_table("subcategory", subcategoryTable)
58 formatTable = EventbriteFormatTable(self)
59 self._register_table("formats", formatTable)
61 eventDetailsTable = EventbriteEventDetailsTable(self)
62 self._register_table("event_details", eventDetailsTable)
64 eventsTable = EventbriteEventsTable(self)
65 self._register_table("events", eventsTable)
67 def connect(self):
68 """Initialize the Eventbrite API Client."""
69 if self.is_connected:
70 return self.api
72 self.api = Client(access_token=self.connection_args["access_token"])
74 self.is_connected = True
75 return self.api
77 def check_connection(self) -> StatusResponse:
78 """Check the connection to the Eventbrite API."""
79 response = StatusResponse(False)
81 try:
82 api = self.connect()
83 me = api.get_current_user()
84 logger.info(f"Connected to Eventbrite as {me['name']}")
85 response.success = True
87 except Exception as e:
88 response.error_message = f"Error connecting to Eventbrite API: {e}"
89 logger.error(response.error_message)
91 if not response.success and self.is_connected:
92 self.is_connected = False
94 return response