Coverage for mindsdb / integrations / handlers / luma_handler / luma_tables.py: 0%
62 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
2from mindsdb.integrations.libs.api_handler import APITable
3from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor, INSERTQueryParser
4from mindsdb_sql_parser import ast
7class LumaEventsTable(APITable):
8 """The Luma Get Event Table implementation"""
10 def select(self, query: ast.Select) -> pd.DataFrame:
11 """Pulls data from the https://docs.lu.ma/reference/get-event-1 and https://docs.lu.ma/reference/calendar-list-events" API
13 Parameters
14 ----------
15 query : ast.Select
16 Given SQL SELECT query
18 Returns
19 -------
20 pd.DataFrame
21 Luma events
23 Raises
24 ------
25 ValueError
26 If the query contains an unsupported condition
27 """
29 select_statement_parser = SELECTQueryParser(
30 query,
31 'events',
32 self.get_columns()
33 )
35 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
37 search_params = {}
38 subset_where_conditions = []
39 filter_flag = False
40 for op, arg1, arg2 in where_conditions:
41 if arg1 == 'event_id':
42 if op == '=':
43 search_params["event_id"] = arg2
44 filter_flag = True
45 else:
46 raise NotImplementedError("Only '=' operator is supported for event_id column.")
47 elif arg1 in self.get_columns():
48 subset_where_conditions.append([op, arg1, arg2])
50 df = pd.DataFrame(columns=self.get_columns())
52 if filter_flag:
53 response = self.handler.luma_client.get_event(search_params["event_id"])
54 event = response["content"]["event"]
55 df = pd.json_normalize(event)
56 else:
57 response = self.handler.luma_client.list_events()
58 content = response["content"]["entries"]
59 events_only = [event["event"] for event in content]
60 df = pd.json_normalize(events_only)
62 select_statement_executor = SELECTQueryExecutor(
63 df,
64 selected_columns,
65 subset_where_conditions,
66 order_by_conditions,
67 result_limit
68 )
70 df = select_statement_executor.execute_query()
72 return df
74 def get_columns(self) -> list:
75 return ["api_id",
76 "cover_url",
77 "name",
78 "description",
79 "series_api_id",
80 "start_at",
81 "duration_interval",
82 "end_at",
83 "geo_latitude",
84 "geo_longitude",
85 "url",
86 "timezone",
87 "event_type",
88 "user_api_id",
89 "visibility",
90 "geo_address_json.city",
91 "geo_address_json.type",
92 "geo_address_json.region",
93 "geo_address_json.address",
94 "geo_address_json.country",
95 "geo_address_json.latitude",
96 "geo_address_json.place_id",
97 "geo_address_json.longitude",
98 "geo_address_json.city_state",
99 "geo_address_json.description",
100 "geo_address_json.full_address"]
102 def _parse_event_insert_data(self, event):
103 data = {}
104 data["name"] = event["name"]
105 data["start_at"] = event["start_at"]
106 data["timezone"] = event["timezone"]
108 if "end_at" in event:
109 data["end_at"] = event["end_at"]
111 if "require_rsvp_approval" in event:
112 data["require_rsvp_approval"] = event["require_rsvp_approval"]
114 if "geo_latitude" in event:
115 data["geo_latitude"] = event["geo_latitude"]
117 if "geo_longitude" in event:
118 data["geo_longitude"] = event["geo_longitude"]
120 if "meeting_url" in event:
121 data["meeting_url"] = event["meeting_url"]
123 data["geo_address_json"] = {}
125 if "geo_address_json_type" in event:
126 data["geo_address_json"]["type"] = event["geo_address_json_type"]
128 if "geo_address_json_place_id" in event:
129 data["geo_address_json"]["place_id"] = event["geo_address_json_place_id"]
131 if "geo_address_json_description" in event:
132 data["geo_address_json"]["description"] = event["geo_address_json_description"]
134 return data
136 def insert(self, query: ast.ASTNode) -> None:
137 """Inserts data into the API endpoint.
139 Parameters
140 ----------
141 query : ast.Insert
142 Given SQL INSERT query
144 Returns
145 -------
146 None
148 Raises
149 ------
150 ValueError
151 If the query contains an unsupported condition
152 """
153 insert_statement_parser = INSERTQueryParser(
154 query,
155 supported_columns=["name", "start_at", "timezone", "end_at", "require_rsvp_approval", "geo_address_json_type", "geo_address_json_place_id", "geo_address_json_description", "geo_latitude", "geo_longitude", "meeting_url"],
156 mandatory_columns=["name", "start_at", "timezone"],
157 all_mandatory=False
158 )
160 event_data = insert_statement_parser.parse_query()
162 for event in event_data:
163 parsed_event_data = self._parse_event_insert_data(event)
164 self.handler.luma_client.create_event(parsed_event_data)