Coverage for mindsdb / integrations / handlers / luma_handler / luma_tables.py: 0%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2from mindsdb.integrations.libs.api_handler import APITable 

3from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor, INSERTQueryParser 

4from mindsdb_sql_parser import ast 

5 

6 

7class LumaEventsTable(APITable): 

8 """The Luma Get Event Table implementation""" 

9 

10 def select(self, query: ast.Select) -> pd.DataFrame: 

11 """Pulls data from the https://docs.lu.ma/reference/get-event-1 and https://docs.lu.ma/reference/calendar-list-events" API 

12 

13 Parameters 

14 ---------- 

15 query : ast.Select 

16 Given SQL SELECT query 

17 

18 Returns 

19 ------- 

20 pd.DataFrame 

21 Luma events 

22 

23 Raises 

24 ------ 

25 ValueError 

26 If the query contains an unsupported condition 

27 """ 

28 

29 select_statement_parser = SELECTQueryParser( 

30 query, 

31 'events', 

32 self.get_columns() 

33 ) 

34 

35 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

36 

37 search_params = {} 

38 subset_where_conditions = [] 

39 filter_flag = False 

40 for op, arg1, arg2 in where_conditions: 

41 if arg1 == 'event_id': 

42 if op == '=': 

43 search_params["event_id"] = arg2 

44 filter_flag = True 

45 else: 

46 raise NotImplementedError("Only '=' operator is supported for event_id column.") 

47 elif arg1 in self.get_columns(): 

48 subset_where_conditions.append([op, arg1, arg2]) 

49 

50 df = pd.DataFrame(columns=self.get_columns()) 

51 

52 if filter_flag: 

53 response = self.handler.luma_client.get_event(search_params["event_id"]) 

54 event = response["content"]["event"] 

55 df = pd.json_normalize(event) 

56 else: 

57 response = self.handler.luma_client.list_events() 

58 content = response["content"]["entries"] 

59 events_only = [event["event"] for event in content] 

60 df = pd.json_normalize(events_only) 

61 

62 select_statement_executor = SELECTQueryExecutor( 

63 df, 

64 selected_columns, 

65 subset_where_conditions, 

66 order_by_conditions, 

67 result_limit 

68 ) 

69 

70 df = select_statement_executor.execute_query() 

71 

72 return df 

73 

74 def get_columns(self) -> list: 

75 return ["api_id", 

76 "cover_url", 

77 "name", 

78 "description", 

79 "series_api_id", 

80 "start_at", 

81 "duration_interval", 

82 "end_at", 

83 "geo_latitude", 

84 "geo_longitude", 

85 "url", 

86 "timezone", 

87 "event_type", 

88 "user_api_id", 

89 "visibility", 

90 "geo_address_json.city", 

91 "geo_address_json.type", 

92 "geo_address_json.region", 

93 "geo_address_json.address", 

94 "geo_address_json.country", 

95 "geo_address_json.latitude", 

96 "geo_address_json.place_id", 

97 "geo_address_json.longitude", 

98 "geo_address_json.city_state", 

99 "geo_address_json.description", 

100 "geo_address_json.full_address"] 

101 

102 def _parse_event_insert_data(self, event): 

103 data = {} 

104 data["name"] = event["name"] 

105 data["start_at"] = event["start_at"] 

106 data["timezone"] = event["timezone"] 

107 

108 if "end_at" in event: 

109 data["end_at"] = event["end_at"] 

110 

111 if "require_rsvp_approval" in event: 

112 data["require_rsvp_approval"] = event["require_rsvp_approval"] 

113 

114 if "geo_latitude" in event: 

115 data["geo_latitude"] = event["geo_latitude"] 

116 

117 if "geo_longitude" in event: 

118 data["geo_longitude"] = event["geo_longitude"] 

119 

120 if "meeting_url" in event: 

121 data["meeting_url"] = event["meeting_url"] 

122 

123 data["geo_address_json"] = {} 

124 

125 if "geo_address_json_type" in event: 

126 data["geo_address_json"]["type"] = event["geo_address_json_type"] 

127 

128 if "geo_address_json_place_id" in event: 

129 data["geo_address_json"]["place_id"] = event["geo_address_json_place_id"] 

130 

131 if "geo_address_json_description" in event: 

132 data["geo_address_json"]["description"] = event["geo_address_json_description"] 

133 

134 return data 

135 

136 def insert(self, query: ast.ASTNode) -> None: 

137 """Inserts data into the API endpoint. 

138 

139 Parameters 

140 ---------- 

141 query : ast.Insert 

142 Given SQL INSERT query 

143 

144 Returns 

145 ------- 

146 None 

147 

148 Raises 

149 ------ 

150 ValueError 

151 If the query contains an unsupported condition 

152 """ 

153 insert_statement_parser = INSERTQueryParser( 

154 query, 

155 supported_columns=["name", "start_at", "timezone", "end_at", "require_rsvp_approval", "geo_address_json_type", "geo_address_json_place_id", "geo_address_json_description", "geo_latitude", "geo_longitude", "meeting_url"], 

156 mandatory_columns=["name", "start_at", "timezone"], 

157 all_mandatory=False 

158 ) 

159 

160 event_data = insert_statement_parser.parse_query() 

161 

162 for event in event_data: 

163 parsed_event_data = self._parse_event_insert_data(event) 

164 self.handler.luma_client.create_event(parsed_event_data)