Coverage for mindsdb / integrations / handlers / gong_handler / gong_tables.py: 0%
284 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import List, Dict, Any, Callable, Union
2from datetime import datetime, timedelta, timezone
3import pandas as pd
5from mindsdb.integrations.libs.api_handler import APIResource
6from mindsdb.integrations.utilities.sql_utils import FilterCondition, FilterOperator, SortColumn
7from mindsdb.utilities import log
10logger = log.getLogger(__name__)
13def normalize_datetime_to_iso8601(dt_value: Union[str, datetime, None]) -> str:
14 """
15 Normalize various datetime formats to ISO 8601 with UTC timezone (Z suffix).
17 Args:
18 dt_value: Datetime value as string, datetime object, or None
20 Returns:
21 ISO 8601 formatted string with Z suffix (e.g., "2024-01-15T10:30:00Z")
23 Raises:
24 ValueError: If the datetime string cannot be parsed
25 """
26 if dt_value is None:
27 return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
29 if isinstance(dt_value, datetime):
30 # If naive (no timezone), assume UTC
31 if dt_value.tzinfo is None:
32 dt_value = dt_value.replace(tzinfo=timezone.utc)
33 # Convert to UTC if not already
34 elif dt_value.tzinfo != timezone.utc:
35 dt_value = dt_value.astimezone(timezone.utc)
36 return dt_value.strftime("%Y-%m-%dT%H:%M:%SZ")
38 if isinstance(dt_value, str):
39 dt_str = dt_value.strip()
41 # Already in correct format with Z suffix
42 if dt_str.endswith("Z") and "T" in dt_str:
43 return dt_str
45 # Has timezone offset like +00:00 or -05:00
46 if dt_str.endswith(("00", "30")) and ("+" in dt_str[-6:] or dt_str[-6:-3] == "-"):
47 try:
48 dt_obj = datetime.fromisoformat(dt_str)
49 return dt_obj.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
50 except ValueError:
51 pass
53 formats_to_try = [
54 "%Y-%m-%dT%H:%M:%S", # ISO without timezone
55 "%Y-%m-%d %H:%M:%S", # Common format
56 "%Y-%m-%d", # Date only (assume start of day UTC)
57 "%Y/%m/%d", # Alternative date format
58 "%d-%m-%Y", # European date format
59 "%m/%d/%Y", # US date format
60 ]
62 for fmt in formats_to_try:
63 try:
64 dt_obj = datetime.strptime(dt_str, fmt)
65 dt_obj = dt_obj.replace(tzinfo=timezone.utc)
66 return dt_obj.strftime("%Y-%m-%dT%H:%M:%SZ")
67 except ValueError:
68 continue
70 try:
71 dt_obj = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
72 return dt_obj.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
73 except ValueError:
74 raise ValueError(f"Unable to parse datetime string: {dt_str}")
76 raise ValueError(f"Unsupported datetime type: {type(dt_value)}")
79def paginate_api_call(
80 api_call: Callable,
81 result_key: str,
82 limit: int = None,
83 params: Dict[str, Any] = None,
84 json_body: Dict[str, Any] = None,
85 cursor_path: str = "records.cursor",
86 cursor_param: str = "cursor",
87 max_pages: int = 100,
88) -> List[Dict]:
89 """
90 Helper function to paginate through API responses.
92 Args:
93 api_call: Function to call the API (should accept params and/or json)
94 result_key: Key in response containing the data items
95 limit: Maximum number of items to fetch
96 params: Initial query parameters for GET requests
97 json_body: Initial JSON body for POST requests
98 cursor_path: Dot-notation path to cursor in response (e.g., "records.cursor")
99 cursor_param: Parameter name for passing cursor to next request
100 max_pages: Maximum number of pages to fetch (safety guard)
102 Returns:
103 List of items from paginated API calls
104 """
105 all_items = []
106 seen_cursors = set()
107 cursor = None
108 page_count = 0
110 params = params or {}
111 json_body = json_body or {}
113 while page_count < max_pages and (not limit or len(all_items) < limit):
114 page_count += 1
116 if cursor:
117 if json_body:
118 current_json = json_body.copy()
119 current_json[cursor_param] = cursor
120 response = api_call(json=current_json)
121 else:
122 current_params = params.copy()
123 current_params[cursor_param] = cursor
124 response = api_call(params=current_params)
125 else:
126 if json_body:
127 response = api_call(json=json_body.copy())
128 else:
129 response = api_call(params=params.copy())
131 items_batch = response.get(result_key, [])
133 if not items_batch:
134 break
136 items_added_this_batch = 0
137 for item in items_batch:
138 if limit and len(all_items) >= limit:
139 break
141 item_str = str(sorted(item.items())) if isinstance(item, dict) else str(item)
142 if item_str not in seen_cursors:
143 all_items.append(item)
144 seen_cursors.add(item_str)
145 items_added_this_batch += 1
147 if limit and len(all_items) >= limit:
148 break
150 next_cursor = response
151 for key in cursor_path.split("."):
152 next_cursor = next_cursor.get(key, {}) if isinstance(next_cursor, dict) else None
153 if next_cursor is None:
154 break
156 if not next_cursor:
157 break
159 if next_cursor == cursor:
160 logger.warning(f"API returned identical cursor: {cursor}. Stopping pagination.")
161 break
163 cursor_str = str(next_cursor)
164 if cursor_str in seen_cursors:
165 logger.warning(f"Detected cursor cycle at: {cursor_str}. Stopping pagination.")
166 break
168 seen_cursors.add(cursor_str)
169 cursor = next_cursor
171 if page_count >= max_pages:
172 logger.warning(f"Reached maximum page limit ({max_pages}). There may be more data available.")
174 return all_items
177class GongCallsTable(APIResource):
178 """The Gong Calls Table implementation"""
180 def list(
181 self,
182 conditions: List[FilterCondition] = None,
183 limit: int = None,
184 sort: List[SortColumn] = None,
185 targets: List[str] = None,
186 ) -> pd.DataFrame:
187 """Pulls data from the Gong Calls API
189 Returns
190 -------
191 pd.DataFrame
192 Gong calls matching the query
194 Raises
195 ------
196 ValueError
197 If the query contains an unsupported condition
198 """
200 api_params = {}
201 if conditions:
202 for condition in conditions:
203 if condition.column == "date" and condition.op == FilterOperator.GREATER_THAN:
204 api_params["fromDateTime"] = normalize_datetime_to_iso8601(condition.value)
205 condition.applied = True
206 elif condition.column == "date" and condition.op == FilterOperator.LESS_THAN:
207 api_params["toDateTime"] = normalize_datetime_to_iso8601(condition.value)
208 condition.applied = True
210 try:
211 all_calls = paginate_api_call(
212 api_call=lambda params: self.handler.call_gong_api("/v2/calls", params=params),
213 result_key="calls",
214 limit=limit,
215 params=api_params,
216 )
218 data = []
219 for call in all_calls:
220 started = call.get("started", "")
221 date = started.split("T")[0] if started else ""
223 item = {
224 "call_id": call.get("id"),
225 "title": call.get("title"),
226 "date": date,
227 "duration": call.get("duration"),
228 "recording_url": call.get("url", ""),
229 "call_type": call.get("system"),
230 "user_id": call.get("primaryUserId"),
231 "participants": ",".join([p.get("name", "") for p in call.get("participants", [])]),
232 "status": call.get("status"),
233 }
234 data.append(item)
236 df = pd.DataFrame(data)
238 if conditions:
239 for condition in conditions:
240 if not condition.applied and condition.column in df.columns:
241 if condition.op == FilterOperator.EQUAL:
242 df = df[df[condition.column] == condition.value]
243 condition.applied = True
245 if sort:
246 for col in sort:
247 if col.column in df.columns:
248 df = df.sort_values(by=col.column, ascending=col.ascending, na_position="last")
249 col.applied = True
250 break
251 return df
253 except Exception as e:
254 logger.error(f"Error fetching calls from Gong API: {e}")
255 raise
257 def get_columns(self) -> List[str]:
258 """Returns the columns of the calls table"""
259 return [
260 "call_id",
261 "title",
262 "date",
263 "duration",
264 "recording_url",
265 "call_type",
266 "user_id",
267 "participants",
268 "status",
269 ]
272class GongUsersTable(APIResource):
273 """The Gong Users Table implementation"""
275 def list(
276 self,
277 conditions: List[FilterCondition] = None,
278 limit: int = None,
279 sort: List[SortColumn] = None,
280 targets: List[str] = None,
281 ) -> pd.DataFrame:
282 """Pulls data from the Gong Users API
284 Returns
285 -------
286 pd.DataFrame
287 Gong users matching the query
288 """
290 api_params = {}
292 try:
293 # Use pagination helper to fetch users
294 all_users = paginate_api_call(
295 api_call=lambda params: self.handler.call_gong_api("/v2/users", params=params),
296 result_key="users",
297 limit=limit,
298 params=api_params,
299 )
301 # Process the limited data
302 data = []
303 for user in all_users:
304 # Safely concatenate names - handle None values
305 first_name = user.get("firstName") or ""
306 last_name = user.get("lastName") or ""
307 full_name = f"{first_name} {last_name}".strip()
309 item = {
310 "user_id": user.get("id"),
311 "name": full_name,
312 "email": user.get("emailAddress", ""),
313 "role": user.get("title", ""),
314 "permissions": ",".join(user.get("permissions", [])),
315 "status": "active" if user.get("active", False) else "inactive",
316 }
317 data.append(item)
319 df = pd.DataFrame(data)
321 if conditions:
322 for condition in conditions:
323 if condition.column in df.columns:
324 if condition.op == FilterOperator.EQUAL:
325 df = df[df[condition.column] == condition.value]
326 condition.applied = True
328 if sort:
329 for col in sort:
330 if col.column in df.columns:
331 df = df.sort_values(by=col.column, ascending=col.ascending, na_position="last")
332 col.applied = True
333 break
335 if limit is not None:
336 df = df.head(limit)
338 return df
340 except Exception as e:
341 logger.error(f"Error fetching users from Gong API: {e}")
342 raise
344 def get_columns(self) -> List[str]:
345 """Returns the columns of the users table"""
346 return ["user_id", "name", "email", "role", "permissions", "status"]
349class GongAnalyticsTable(APIResource):
350 """The Gong Analytics Table implementation"""
352 def list(
353 self,
354 conditions: List[FilterCondition] = None,
355 limit: int = None,
356 sort: List[SortColumn] = None,
357 targets: List[str] = None,
358 ) -> pd.DataFrame:
359 """Pulls data from the Gong Analytics API
361 Returns
362 -------
363 pd.DataFrame
364 Gong analytics matching the query
365 """
367 try:
368 # Default to last 7 days if no date filters provided
369 default_from = datetime.now(timezone.utc) - timedelta(days=7)
370 default_to = datetime.now(timezone.utc)
372 payload = {
373 "filter": {
374 "fromDateTime": normalize_datetime_to_iso8601(default_from),
375 "toDateTime": normalize_datetime_to_iso8601(default_to),
376 },
377 "contentSelector": {
378 "exposedFields": {
379 "content": {
380 "brief": True,
381 "outline": True,
382 "highlights": True,
383 "callOutcome": True,
384 "topics": True,
385 "trackers": True,
386 },
387 "interaction": {"personInteractionStats": True, "questions": True},
388 }
389 },
390 }
392 if conditions:
393 for condition in conditions:
394 if condition.column == "date" and condition.op == FilterOperator.GREATER_THAN:
395 payload["filter"]["fromDateTime"] = normalize_datetime_to_iso8601(condition.value)
396 condition.applied = True
397 elif condition.column == "date" and condition.op == FilterOperator.LESS_THAN:
398 payload["filter"]["toDateTime"] = normalize_datetime_to_iso8601(condition.value)
399 condition.applied = True
401 # Fetch calls using improved pagination helper with POST support
402 calls_data = paginate_api_call(
403 api_call=lambda **kwargs: self.handler.call_gong_api("/v2/calls/extensive", method="POST", **kwargs),
404 result_key="calls",
405 limit=limit,
406 json_body=payload,
407 cursor_path="records.cursor",
408 cursor_param="cursor",
409 max_pages=100,
410 )
412 # Process each call to extract analytics
413 all_analytics = []
414 for call in calls_data:
415 # Extract analytics from extensive call data
416 content = call.get("content", {})
417 interaction = call.get("interaction", {})
418 metadata = call.get("metaData", {})
420 # Sentiment and Emotion from InteractionStats
421 person_stats = interaction.get("interactionStats", [])
422 sentiment_score = 0
423 emotions = "" # Initialize emotions
425 if person_stats:
426 stats_dict = {stat["name"]: stat["value"] for stat in interaction.get("interactionStats", [])}
427 sentiment_score = (
428 stats_dict.get("Talk Ratio", 0)
429 + stats_dict.get("Patience", 0)
430 + min(stats_dict.get("Interactivity", 0) / 10, 1.0)
431 ) / 3
432 emotions = f"Talk:{stats_dict.get('Talk Ratio', 0)}, Patience:{stats_dict.get('Patience', 0)}, Interactivity:{stats_dict.get('Interactivity', 0)}"
434 # Topics from AI analysis
435 topics = content.get("topics", [])
436 topic_names = [
437 topic.get("name", "")
438 for topic in topics
439 if isinstance(topic, dict) and topic.get("duration", 0) > 0
440 ]
442 # Key phrases from AI
443 trackers = content.get("trackers", [])
444 key_phrases = [tracker.get("name", "") for tracker in trackers if tracker.get("count", 0) > 0]
446 # Topic scoring based on relevance - prevent division by zero
447 topic_score = 0
448 if topics:
449 valid_topics = [topic for topic in topics if isinstance(topic, dict)]
450 if valid_topics:
451 total_topic_duration = sum([topic.get("duration", 0) for topic in valid_topics])
452 avg_topic_duration = total_topic_duration / len(valid_topics)
453 call_duration = metadata.get("duration", 0)
454 if call_duration > 0:
455 topic_score = avg_topic_duration / call_duration
457 item = {
458 "call_id": metadata.get("id"),
459 "sentiment_score": round(sentiment_score, 3),
460 "topic_score": round(topic_score, 3),
461 "key_phrases": ", ".join(key_phrases),
462 "topics": ", ".join(topic_names),
463 "emotions": emotions,
464 "confidence_score": "",
465 }
466 all_analytics.append(item)
468 df = pd.DataFrame(all_analytics)
470 # Apply non-date filtering at DataFrame level
471 if conditions:
472 for condition in conditions:
473 if not condition.applied and condition.column in df.columns:
474 if condition.op == FilterOperator.EQUAL:
475 df = df[df[condition.column] == condition.value]
476 condition.applied = True
478 # Apply sorting at DataFrame level
479 if sort:
480 for col in sort:
481 if col.column in df.columns:
482 df = df.sort_values(by=col.column, ascending=col.ascending, na_position="last")
483 col.applied = True
484 break
486 if limit is not None:
487 df = df.head(limit)
489 return df
491 except Exception as e:
492 logger.error(f"Error fetching analytics from Gong API: {e}")
493 raise
495 def get_columns(self) -> List[str]:
496 """Returns the columns of the analytics table"""
497 return ["call_id", "sentiment_score", "topic_score", "key_phrases", "topics", "emotions", "confidence_score"]
500class GongTranscriptsTable(APIResource):
501 """The Gong Transcripts Table implementation"""
503 def list(
504 self,
505 conditions: List[FilterCondition] = None,
506 limit: int = None,
507 sort: List[SortColumn] = None,
508 targets: List[str] = None,
509 ) -> pd.DataFrame:
510 """Pulls data from the Gong Transcripts API
512 Returns
513 -------
514 pd.DataFrame
515 Gong transcripts matching the query
516 """
518 try:
519 calls_api_params = {}
521 if conditions:
522 for condition in conditions:
523 if condition.column == "date" and condition.op == FilterOperator.GREATER_THAN:
524 calls_api_params["fromDateTime"] = normalize_datetime_to_iso8601(condition.value)
525 condition.applied = True
526 elif condition.column == "date" and condition.op == FilterOperator.LESS_THAN:
527 calls_api_params["toDateTime"] = normalize_datetime_to_iso8601(condition.value)
528 condition.applied = True
530 # Fetch call IDs using pagination helper
531 calls_fetch_limit = limit if limit else 100
532 all_calls = paginate_api_call(
533 api_call=lambda params: self.handler.call_gong_api("/v2/calls", params=params),
534 result_key="calls",
535 limit=calls_fetch_limit,
536 params=calls_api_params,
537 )
538 all_call_ids = [call.get("id") for call in all_calls if call.get("id")]
540 if not all_call_ids:
541 return pd.DataFrame()
542 call_ids_to_fetch = all_call_ids[:limit] if limit else all_call_ids
544 # Fetch transcripts using improved pagination helper with POST support
545 payload = {"filter": {"callIds": call_ids_to_fetch}}
546 call_transcripts = paginate_api_call(
547 api_call=lambda **kwargs: self.handler.call_gong_api("/v2/calls/transcript", method="POST", **kwargs),
548 result_key="callTranscripts",
549 limit=None, # Get all transcripts for the specified calls
550 json_body=payload,
551 cursor_path="records.cursor",
552 cursor_param="cursor",
553 max_pages=50,
554 )
556 # Process transcripts
557 all_transcript_data = []
558 for call_transcript in call_transcripts:
559 call_id = call_transcript.get("callId")
560 transcript_segments = call_transcript.get("transcript", [])
562 segment_counter = 0
564 for speaker_block in transcript_segments:
565 speaker_id = speaker_block.get("speakerId")
566 sentences = speaker_block.get("sentences", [])
568 for sentence in sentences:
569 segment_counter += 1
571 item = {
572 "call_id": call_id,
573 "speaker": speaker_id,
574 "timestamp": sentence.get("start"),
575 "text": sentence.get("text"),
576 "confidence": sentence.get("confidence"),
577 "segment_id": f"{call_id}_{segment_counter}",
578 }
579 all_transcript_data.append(item)
581 df = pd.DataFrame(all_transcript_data)
583 if conditions:
584 for condition in conditions:
585 if not condition.applied and condition.column in df.columns:
586 if condition.op == FilterOperator.EQUAL:
587 df = df[df[condition.column] == condition.value]
588 condition.applied = True
589 elif condition.op == FilterOperator.LIKE or condition.op == FilterOperator.CONTAINS:
590 if condition.column == "text":
591 df = df[df[condition.column].str.contains(condition.value, case=False, na=False)]
592 condition.applied = True
594 if sort:
595 for col in sort:
596 if col.column in df.columns:
597 df = df.sort_values(by=col.column, ascending=col.ascending, na_position="last")
598 col.applied = True
599 break
601 if limit is not None:
602 df = df.head(limit)
603 return df
605 except Exception as e:
606 logger.error(f"Error fetching transcripts from Gong API: {e}")
607 raise
609 def get_columns(self) -> List[str]:
610 """Returns the columns of the transcripts table"""
611 return ["call_id", "speaker", "timestamp", "text", "confidence", "segment_id"]