Coverage for mindsdb / integrations / handlers / google_calendar_handler / google_calendar_tables.py: 0%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2import datetime 

3from mindsdb_sql_parser import ast 

4from pandas import DataFrame 

5 

6from mindsdb.integrations.libs.api_handler import APITable 

7from mindsdb.integrations.utilities.date_utils import utc_date_str_to_timestamp_ms, parse_utc_date 

8from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

9 

10 

11class GoogleCalendarEventsTable(APITable): 

12 def select(self, query: ast.Select) -> DataFrame: 

13 """ 

14 Gets all events from the calendar. 

15 

16 Args: 

17 query (ast.Select): SQL query to parse. 

18 

19 Returns: 

20 Response: Response object containing the results. 

21 """ 

22 

23 # Parse the query to get the conditions. 

24 conditions = extract_comparison_conditions(query.where) 

25 # Get the start and end times from the conditions. 

26 params = {} 

27 for op, arg1, arg2 in conditions: 

28 if arg1 == "timeMax" or arg1 == "timeMin": 

29 date = parse_utc_date(arg2) 

30 if op == "=": 

31 params[arg1] = date 

32 else: 

33 raise NotImplementedError 

34 elif arg1 == "timeZone": 

35 params[arg1] = arg2 

36 elif arg1 == "maxAttendees": 

37 params[arg1] = arg2 

38 elif arg1 == "q": 

39 params[arg1] = arg2 

40 

41 # Get the order by from the query. 

42 if query.order_by is not None: 

43 if query.order_by[0].value == "start_time": 

44 params["orderBy"] = "startTime" 

45 elif query.order_by[0].value == "updated": 

46 params["orderBy"] = "updated" 

47 else: 

48 raise NotImplementedError 

49 

50 if query.limit is not None: 

51 params["maxResults"] = query.limit.value 

52 

53 # Get the events from the Google Calendar API. 

54 events = self.handler.call_application_api(method_name="get_events", params=params) 

55 

56 selected_columns = [] 

57 for target in query.targets: 

58 if isinstance(target, ast.Star): 

59 selected_columns = self.get_columns() 

60 break 

61 elif isinstance(target, ast.Identifier): 

62 selected_columns.append(target.parts[-1]) 

63 else: 

64 raise ValueError(f"Unknown query target {type(target)}") 

65 

66 if len(events) == 0: 

67 events = pd.DataFrame([], columns=selected_columns) 

68 else: 

69 events.columns = self.get_columns() 

70 for col in set(events.columns).difference(set(selected_columns)): 

71 events = events.drop(col, axis=1) 

72 return events 

73 

74 def insert(self, query: ast.Insert): 

75 """ 

76 Inserts an event into the calendar. 

77 

78 Args: 

79 query (ast.Insert): SQL query to parse. 

80 

81 Returns: 

82 Response: Response object containing the results. 

83 """ 

84 

85 # Get the values from the query. 

86 values = query.values[0] 

87 # Get the event data from the values. 

88 event_data = {} 

89 timestamp_columns = {"start_time", "end_time", "created", "updated"} 

90 regular_columns = { 

91 "summary", 

92 "description", 

93 "location", 

94 "status", 

95 "html_link", 

96 "creator", 

97 "organizer", 

98 "reminders", 

99 "timeZone", 

100 "calendar_id", 

101 "attendees", 

102 } 

103 

104 # TODO: check why query.columns is None 

105 for col, val in zip(query.columns, values): 

106 if col.name in timestamp_columns: 

107 event_data[col.name] = utc_date_str_to_timestamp_ms(val) 

108 elif col.name in regular_columns: 

109 event_data[col.name] = val 

110 else: 

111 raise NotImplementedError 

112 

113 st = datetime.datetime.fromtimestamp(event_data["start_time"] / 1000, datetime.timezone.utc).isoformat() + "Z" 

114 et = datetime.datetime.fromtimestamp(event_data["end_time"] / 1000, datetime.timezone.utc).isoformat() + "Z" 

115 

116 event_data["start"] = {"dateTime": st, "timeZone": event_data["timeZone"]} 

117 

118 event_data["end"] = {"dateTime": et, "timeZone": event_data["timeZone"]} 

119 

120 event_data["attendees"] = event_data["attendees"].split(",") 

121 event_data["attendees"] = [{"email": attendee} for attendee in event_data["attendees"]] 

122 

123 # Insert the event into the Google Calendar API. 

124 self.handler.call_application_api(method_name="create_event", params=event_data) 

125 

126 def update(self, query: ast.Update): 

127 """ 

128 Updates an event or events in the calendar. 

129 

130 Args: 

131 query (ast.Update): SQL query to parse. 

132 

133 Returns: 

134 Response: Response object containing the results. 

135 """ 

136 

137 # Get the values from the query. 

138 values = query.values[0] 

139 # Get the event data from the values. 

140 event_data = {} 

141 for col, val in zip(query.update_columns, values): 

142 if col == "start_time" or col == "end_time" or col == "created" or col == "updated": 

143 event_data[col] = utc_date_str_to_timestamp_ms(val) 

144 elif ( 

145 col == "summary" 

146 or col == "description" 

147 or col == "location" 

148 or col == "status" 

149 or col == "html_link" 

150 or col == "creator" 

151 or col == "organizer" 

152 or col == "reminders" 

153 or col == "timeZone" 

154 or col == "calendar_id" 

155 or col == "attendees" 

156 ): 

157 event_data[col] = val 

158 else: 

159 raise NotImplementedError 

160 

161 event_data["start"] = {"dateTime": event_data["start_time"], "timeZone": event_data["timeZone"]} 

162 

163 event_data["end"] = {"dateTime": event_data["end_time"], "timeZone": event_data["timeZone"]} 

164 

165 event_data["attendees"] = event_data.get("attendees").split(",") 

166 event_data["attendees"] = [{"email": attendee} for attendee in event_data["attendees"]] 

167 

168 conditions = extract_comparison_conditions(query.where) 

169 for op, arg1, arg2 in conditions: 

170 if arg1 == "event_id": 

171 if op == "=": 

172 event_data["event_id"] = arg2 

173 elif op == ">": 

174 event_data["start_id"] = arg2 

175 elif op == "<": 

176 event_data["end_id"] = arg2 

177 else: 

178 raise NotImplementedError 

179 else: 

180 raise NotImplementedError 

181 

182 # Update the event in the Google Calendar API. 

183 self.handler.call_application_api(method_name="update_event", params=event_data) 

184 

185 def delete(self, query: ast.Delete): 

186 """ 

187 Deletes an event or events in the calendar. 

188 

189 Args: 

190 query (ast.Delete): SQL query to parse. 

191 

192 Returns: 

193 Response: Response object containing the results. 

194 """ 

195 

196 # Parse the query to get the conditions. 

197 conditions = extract_comparison_conditions(query.where) 

198 # Get the start and end times from the conditions. 

199 params = {} 

200 for op, arg1, arg2 in conditions: 

201 if arg1 == "event_id": 

202 if op == "=": 

203 params[arg1] = arg2 

204 elif op == ">": 

205 params["start_id"] = arg2 

206 elif op == "<": 

207 params["end_id"] = arg2 

208 else: 

209 raise NotImplementedError 

210 

211 # Delete the events in the Google Calendar API. 

212 self.handler.call_application_api(method_name="delete_event", params=params) 

213 

214 def get_columns(self) -> list: 

215 """Gets all columns to be returned in pandas DataFrame responses""" 

216 return [ 

217 "etag", 

218 "id", 

219 "status", 

220 "htmlLink", 

221 "created", 

222 "updated", 

223 "summary", 

224 "creator", 

225 "organizer", 

226 "start", 

227 "end", 

228 "timeZone", 

229 "iCalUID", 

230 "sequence", 

231 "reminders", 

232 "eventType", 

233 ]