Coverage for mindsdb / integrations / handlers / gong_handler / gong_tables.py: 0%

284 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import List, Dict, Any, Callable, Union 

2from datetime import datetime, timedelta, timezone 

3import pandas as pd 

4 

5from mindsdb.integrations.libs.api_handler import APIResource 

6from mindsdb.integrations.utilities.sql_utils import FilterCondition, FilterOperator, SortColumn 

7from mindsdb.utilities import log 

8 

9 

10logger = log.getLogger(__name__) 

11 

12 

13def normalize_datetime_to_iso8601(dt_value: Union[str, datetime, None]) -> str: 

14 """ 

15 Normalize various datetime formats to ISO 8601 with UTC timezone (Z suffix). 

16 

17 Args: 

18 dt_value: Datetime value as string, datetime object, or None 

19 

20 Returns: 

21 ISO 8601 formatted string with Z suffix (e.g., "2024-01-15T10:30:00Z") 

22 

23 Raises: 

24 ValueError: If the datetime string cannot be parsed 

25 """ 

26 if dt_value is None: 

27 return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") 

28 

29 if isinstance(dt_value, datetime): 

30 # If naive (no timezone), assume UTC 

31 if dt_value.tzinfo is None: 

32 dt_value = dt_value.replace(tzinfo=timezone.utc) 

33 # Convert to UTC if not already 

34 elif dt_value.tzinfo != timezone.utc: 

35 dt_value = dt_value.astimezone(timezone.utc) 

36 return dt_value.strftime("%Y-%m-%dT%H:%M:%SZ") 

37 

38 if isinstance(dt_value, str): 

39 dt_str = dt_value.strip() 

40 

41 # Already in correct format with Z suffix 

42 if dt_str.endswith("Z") and "T" in dt_str: 

43 return dt_str 

44 

45 # Has timezone offset like +00:00 or -05:00 

46 if dt_str.endswith(("00", "30")) and ("+" in dt_str[-6:] or dt_str[-6:-3] == "-"): 

47 try: 

48 dt_obj = datetime.fromisoformat(dt_str) 

49 return dt_obj.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") 

50 except ValueError: 

51 pass 

52 

53 formats_to_try = [ 

54 "%Y-%m-%dT%H:%M:%S", # ISO without timezone 

55 "%Y-%m-%d %H:%M:%S", # Common format 

56 "%Y-%m-%d", # Date only (assume start of day UTC) 

57 "%Y/%m/%d", # Alternative date format 

58 "%d-%m-%Y", # European date format 

59 "%m/%d/%Y", # US date format 

60 ] 

61 

62 for fmt in formats_to_try: 

63 try: 

64 dt_obj = datetime.strptime(dt_str, fmt) 

65 dt_obj = dt_obj.replace(tzinfo=timezone.utc) 

66 return dt_obj.strftime("%Y-%m-%dT%H:%M:%SZ") 

67 except ValueError: 

68 continue 

69 

70 try: 

71 dt_obj = datetime.fromisoformat(dt_str.replace("Z", "+00:00")) 

72 return dt_obj.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") 

73 except ValueError: 

74 raise ValueError(f"Unable to parse datetime string: {dt_str}") 

75 

76 raise ValueError(f"Unsupported datetime type: {type(dt_value)}") 

77 

78 

79def paginate_api_call( 

80 api_call: Callable, 

81 result_key: str, 

82 limit: int = None, 

83 params: Dict[str, Any] = None, 

84 json_body: Dict[str, Any] = None, 

85 cursor_path: str = "records.cursor", 

86 cursor_param: str = "cursor", 

87 max_pages: int = 100, 

88) -> List[Dict]: 

89 """ 

90 Helper function to paginate through API responses. 

91 

92 Args: 

93 api_call: Function to call the API (should accept params and/or json) 

94 result_key: Key in response containing the data items 

95 limit: Maximum number of items to fetch 

96 params: Initial query parameters for GET requests 

97 json_body: Initial JSON body for POST requests 

98 cursor_path: Dot-notation path to cursor in response (e.g., "records.cursor") 

99 cursor_param: Parameter name for passing cursor to next request 

100 max_pages: Maximum number of pages to fetch (safety guard) 

101 

102 Returns: 

103 List of items from paginated API calls 

104 """ 

105 all_items = [] 

106 seen_cursors = set() 

107 cursor = None 

108 page_count = 0 

109 

110 params = params or {} 

111 json_body = json_body or {} 

112 

113 while page_count < max_pages and (not limit or len(all_items) < limit): 

114 page_count += 1 

115 

116 if cursor: 

117 if json_body: 

118 current_json = json_body.copy() 

119 current_json[cursor_param] = cursor 

120 response = api_call(json=current_json) 

121 else: 

122 current_params = params.copy() 

123 current_params[cursor_param] = cursor 

124 response = api_call(params=current_params) 

125 else: 

126 if json_body: 

127 response = api_call(json=json_body.copy()) 

128 else: 

129 response = api_call(params=params.copy()) 

130 

131 items_batch = response.get(result_key, []) 

132 

133 if not items_batch: 

134 break 

135 

136 items_added_this_batch = 0 

137 for item in items_batch: 

138 if limit and len(all_items) >= limit: 

139 break 

140 

141 item_str = str(sorted(item.items())) if isinstance(item, dict) else str(item) 

142 if item_str not in seen_cursors: 

143 all_items.append(item) 

144 seen_cursors.add(item_str) 

145 items_added_this_batch += 1 

146 

147 if limit and len(all_items) >= limit: 

148 break 

149 

150 next_cursor = response 

151 for key in cursor_path.split("."): 

152 next_cursor = next_cursor.get(key, {}) if isinstance(next_cursor, dict) else None 

153 if next_cursor is None: 

154 break 

155 

156 if not next_cursor: 

157 break 

158 

159 if next_cursor == cursor: 

160 logger.warning(f"API returned identical cursor: {cursor}. Stopping pagination.") 

161 break 

162 

163 cursor_str = str(next_cursor) 

164 if cursor_str in seen_cursors: 

165 logger.warning(f"Detected cursor cycle at: {cursor_str}. Stopping pagination.") 

166 break 

167 

168 seen_cursors.add(cursor_str) 

169 cursor = next_cursor 

170 

171 if page_count >= max_pages: 

172 logger.warning(f"Reached maximum page limit ({max_pages}). There may be more data available.") 

173 

174 return all_items 

175 

176 

177class GongCallsTable(APIResource): 

178 """The Gong Calls Table implementation""" 

179 

180 def list( 

181 self, 

182 conditions: List[FilterCondition] = None, 

183 limit: int = None, 

184 sort: List[SortColumn] = None, 

185 targets: List[str] = None, 

186 ) -> pd.DataFrame: 

187 """Pulls data from the Gong Calls API 

188 

189 Returns 

190 ------- 

191 pd.DataFrame 

192 Gong calls matching the query 

193 

194 Raises 

195 ------ 

196 ValueError 

197 If the query contains an unsupported condition 

198 """ 

199 

200 api_params = {} 

201 if conditions: 

202 for condition in conditions: 

203 if condition.column == "date" and condition.op == FilterOperator.GREATER_THAN: 

204 api_params["fromDateTime"] = normalize_datetime_to_iso8601(condition.value) 

205 condition.applied = True 

206 elif condition.column == "date" and condition.op == FilterOperator.LESS_THAN: 

207 api_params["toDateTime"] = normalize_datetime_to_iso8601(condition.value) 

208 condition.applied = True 

209 

210 try: 

211 all_calls = paginate_api_call( 

212 api_call=lambda params: self.handler.call_gong_api("/v2/calls", params=params), 

213 result_key="calls", 

214 limit=limit, 

215 params=api_params, 

216 ) 

217 

218 data = [] 

219 for call in all_calls: 

220 started = call.get("started", "") 

221 date = started.split("T")[0] if started else "" 

222 

223 item = { 

224 "call_id": call.get("id"), 

225 "title": call.get("title"), 

226 "date": date, 

227 "duration": call.get("duration"), 

228 "recording_url": call.get("url", ""), 

229 "call_type": call.get("system"), 

230 "user_id": call.get("primaryUserId"), 

231 "participants": ",".join([p.get("name", "") for p in call.get("participants", [])]), 

232 "status": call.get("status"), 

233 } 

234 data.append(item) 

235 

236 df = pd.DataFrame(data) 

237 

238 if conditions: 

239 for condition in conditions: 

240 if not condition.applied and condition.column in df.columns: 

241 if condition.op == FilterOperator.EQUAL: 

242 df = df[df[condition.column] == condition.value] 

243 condition.applied = True 

244 

245 if sort: 

246 for col in sort: 

247 if col.column in df.columns: 

248 df = df.sort_values(by=col.column, ascending=col.ascending, na_position="last") 

249 col.applied = True 

250 break 

251 return df 

252 

253 except Exception as e: 

254 logger.error(f"Error fetching calls from Gong API: {e}") 

255 raise 

256 

257 def get_columns(self) -> List[str]: 

258 """Returns the columns of the calls table""" 

259 return [ 

260 "call_id", 

261 "title", 

262 "date", 

263 "duration", 

264 "recording_url", 

265 "call_type", 

266 "user_id", 

267 "participants", 

268 "status", 

269 ] 

270 

271 

272class GongUsersTable(APIResource): 

273 """The Gong Users Table implementation""" 

274 

275 def list( 

276 self, 

277 conditions: List[FilterCondition] = None, 

278 limit: int = None, 

279 sort: List[SortColumn] = None, 

280 targets: List[str] = None, 

281 ) -> pd.DataFrame: 

282 """Pulls data from the Gong Users API 

283 

284 Returns 

285 ------- 

286 pd.DataFrame 

287 Gong users matching the query 

288 """ 

289 

290 api_params = {} 

291 

292 try: 

293 # Use pagination helper to fetch users 

294 all_users = paginate_api_call( 

295 api_call=lambda params: self.handler.call_gong_api("/v2/users", params=params), 

296 result_key="users", 

297 limit=limit, 

298 params=api_params, 

299 ) 

300 

301 # Process the limited data 

302 data = [] 

303 for user in all_users: 

304 # Safely concatenate names - handle None values 

305 first_name = user.get("firstName") or "" 

306 last_name = user.get("lastName") or "" 

307 full_name = f"{first_name} {last_name}".strip() 

308 

309 item = { 

310 "user_id": user.get("id"), 

311 "name": full_name, 

312 "email": user.get("emailAddress", ""), 

313 "role": user.get("title", ""), 

314 "permissions": ",".join(user.get("permissions", [])), 

315 "status": "active" if user.get("active", False) else "inactive", 

316 } 

317 data.append(item) 

318 

319 df = pd.DataFrame(data) 

320 

321 if conditions: 

322 for condition in conditions: 

323 if condition.column in df.columns: 

324 if condition.op == FilterOperator.EQUAL: 

325 df = df[df[condition.column] == condition.value] 

326 condition.applied = True 

327 

328 if sort: 

329 for col in sort: 

330 if col.column in df.columns: 

331 df = df.sort_values(by=col.column, ascending=col.ascending, na_position="last") 

332 col.applied = True 

333 break 

334 

335 if limit is not None: 

336 df = df.head(limit) 

337 

338 return df 

339 

340 except Exception as e: 

341 logger.error(f"Error fetching users from Gong API: {e}") 

342 raise 

343 

344 def get_columns(self) -> List[str]: 

345 """Returns the columns of the users table""" 

346 return ["user_id", "name", "email", "role", "permissions", "status"] 

347 

348 

349class GongAnalyticsTable(APIResource): 

350 """The Gong Analytics Table implementation""" 

351 

352 def list( 

353 self, 

354 conditions: List[FilterCondition] = None, 

355 limit: int = None, 

356 sort: List[SortColumn] = None, 

357 targets: List[str] = None, 

358 ) -> pd.DataFrame: 

359 """Pulls data from the Gong Analytics API 

360 

361 Returns 

362 ------- 

363 pd.DataFrame 

364 Gong analytics matching the query 

365 """ 

366 

367 try: 

368 # Default to last 7 days if no date filters provided 

369 default_from = datetime.now(timezone.utc) - timedelta(days=7) 

370 default_to = datetime.now(timezone.utc) 

371 

372 payload = { 

373 "filter": { 

374 "fromDateTime": normalize_datetime_to_iso8601(default_from), 

375 "toDateTime": normalize_datetime_to_iso8601(default_to), 

376 }, 

377 "contentSelector": { 

378 "exposedFields": { 

379 "content": { 

380 "brief": True, 

381 "outline": True, 

382 "highlights": True, 

383 "callOutcome": True, 

384 "topics": True, 

385 "trackers": True, 

386 }, 

387 "interaction": {"personInteractionStats": True, "questions": True}, 

388 } 

389 }, 

390 } 

391 

392 if conditions: 

393 for condition in conditions: 

394 if condition.column == "date" and condition.op == FilterOperator.GREATER_THAN: 

395 payload["filter"]["fromDateTime"] = normalize_datetime_to_iso8601(condition.value) 

396 condition.applied = True 

397 elif condition.column == "date" and condition.op == FilterOperator.LESS_THAN: 

398 payload["filter"]["toDateTime"] = normalize_datetime_to_iso8601(condition.value) 

399 condition.applied = True 

400 

401 # Fetch calls using improved pagination helper with POST support 

402 calls_data = paginate_api_call( 

403 api_call=lambda **kwargs: self.handler.call_gong_api("/v2/calls/extensive", method="POST", **kwargs), 

404 result_key="calls", 

405 limit=limit, 

406 json_body=payload, 

407 cursor_path="records.cursor", 

408 cursor_param="cursor", 

409 max_pages=100, 

410 ) 

411 

412 # Process each call to extract analytics 

413 all_analytics = [] 

414 for call in calls_data: 

415 # Extract analytics from extensive call data 

416 content = call.get("content", {}) 

417 interaction = call.get("interaction", {}) 

418 metadata = call.get("metaData", {}) 

419 

420 # Sentiment and Emotion from InteractionStats 

421 person_stats = interaction.get("interactionStats", []) 

422 sentiment_score = 0 

423 emotions = "" # Initialize emotions 

424 

425 if person_stats: 

426 stats_dict = {stat["name"]: stat["value"] for stat in interaction.get("interactionStats", [])} 

427 sentiment_score = ( 

428 stats_dict.get("Talk Ratio", 0) 

429 + stats_dict.get("Patience", 0) 

430 + min(stats_dict.get("Interactivity", 0) / 10, 1.0) 

431 ) / 3 

432 emotions = f"Talk:{stats_dict.get('Talk Ratio', 0)}, Patience:{stats_dict.get('Patience', 0)}, Interactivity:{stats_dict.get('Interactivity', 0)}" 

433 

434 # Topics from AI analysis 

435 topics = content.get("topics", []) 

436 topic_names = [ 

437 topic.get("name", "") 

438 for topic in topics 

439 if isinstance(topic, dict) and topic.get("duration", 0) > 0 

440 ] 

441 

442 # Key phrases from AI 

443 trackers = content.get("trackers", []) 

444 key_phrases = [tracker.get("name", "") for tracker in trackers if tracker.get("count", 0) > 0] 

445 

446 # Topic scoring based on relevance - prevent division by zero 

447 topic_score = 0 

448 if topics: 

449 valid_topics = [topic for topic in topics if isinstance(topic, dict)] 

450 if valid_topics: 

451 total_topic_duration = sum([topic.get("duration", 0) for topic in valid_topics]) 

452 avg_topic_duration = total_topic_duration / len(valid_topics) 

453 call_duration = metadata.get("duration", 0) 

454 if call_duration > 0: 

455 topic_score = avg_topic_duration / call_duration 

456 

457 item = { 

458 "call_id": metadata.get("id"), 

459 "sentiment_score": round(sentiment_score, 3), 

460 "topic_score": round(topic_score, 3), 

461 "key_phrases": ", ".join(key_phrases), 

462 "topics": ", ".join(topic_names), 

463 "emotions": emotions, 

464 "confidence_score": "", 

465 } 

466 all_analytics.append(item) 

467 

468 df = pd.DataFrame(all_analytics) 

469 

470 # Apply non-date filtering at DataFrame level 

471 if conditions: 

472 for condition in conditions: 

473 if not condition.applied and condition.column in df.columns: 

474 if condition.op == FilterOperator.EQUAL: 

475 df = df[df[condition.column] == condition.value] 

476 condition.applied = True 

477 

478 # Apply sorting at DataFrame level 

479 if sort: 

480 for col in sort: 

481 if col.column in df.columns: 

482 df = df.sort_values(by=col.column, ascending=col.ascending, na_position="last") 

483 col.applied = True 

484 break 

485 

486 if limit is not None: 

487 df = df.head(limit) 

488 

489 return df 

490 

491 except Exception as e: 

492 logger.error(f"Error fetching analytics from Gong API: {e}") 

493 raise 

494 

495 def get_columns(self) -> List[str]: 

496 """Returns the columns of the analytics table""" 

497 return ["call_id", "sentiment_score", "topic_score", "key_phrases", "topics", "emotions", "confidence_score"] 

498 

499 

500class GongTranscriptsTable(APIResource): 

501 """The Gong Transcripts Table implementation""" 

502 

503 def list( 

504 self, 

505 conditions: List[FilterCondition] = None, 

506 limit: int = None, 

507 sort: List[SortColumn] = None, 

508 targets: List[str] = None, 

509 ) -> pd.DataFrame: 

510 """Pulls data from the Gong Transcripts API 

511 

512 Returns 

513 ------- 

514 pd.DataFrame 

515 Gong transcripts matching the query 

516 """ 

517 

518 try: 

519 calls_api_params = {} 

520 

521 if conditions: 

522 for condition in conditions: 

523 if condition.column == "date" and condition.op == FilterOperator.GREATER_THAN: 

524 calls_api_params["fromDateTime"] = normalize_datetime_to_iso8601(condition.value) 

525 condition.applied = True 

526 elif condition.column == "date" and condition.op == FilterOperator.LESS_THAN: 

527 calls_api_params["toDateTime"] = normalize_datetime_to_iso8601(condition.value) 

528 condition.applied = True 

529 

530 # Fetch call IDs using pagination helper 

531 calls_fetch_limit = limit if limit else 100 

532 all_calls = paginate_api_call( 

533 api_call=lambda params: self.handler.call_gong_api("/v2/calls", params=params), 

534 result_key="calls", 

535 limit=calls_fetch_limit, 

536 params=calls_api_params, 

537 ) 

538 all_call_ids = [call.get("id") for call in all_calls if call.get("id")] 

539 

540 if not all_call_ids: 

541 return pd.DataFrame() 

542 call_ids_to_fetch = all_call_ids[:limit] if limit else all_call_ids 

543 

544 # Fetch transcripts using improved pagination helper with POST support 

545 payload = {"filter": {"callIds": call_ids_to_fetch}} 

546 call_transcripts = paginate_api_call( 

547 api_call=lambda **kwargs: self.handler.call_gong_api("/v2/calls/transcript", method="POST", **kwargs), 

548 result_key="callTranscripts", 

549 limit=None, # Get all transcripts for the specified calls 

550 json_body=payload, 

551 cursor_path="records.cursor", 

552 cursor_param="cursor", 

553 max_pages=50, 

554 ) 

555 

556 # Process transcripts 

557 all_transcript_data = [] 

558 for call_transcript in call_transcripts: 

559 call_id = call_transcript.get("callId") 

560 transcript_segments = call_transcript.get("transcript", []) 

561 

562 segment_counter = 0 

563 

564 for speaker_block in transcript_segments: 

565 speaker_id = speaker_block.get("speakerId") 

566 sentences = speaker_block.get("sentences", []) 

567 

568 for sentence in sentences: 

569 segment_counter += 1 

570 

571 item = { 

572 "call_id": call_id, 

573 "speaker": speaker_id, 

574 "timestamp": sentence.get("start"), 

575 "text": sentence.get("text"), 

576 "confidence": sentence.get("confidence"), 

577 "segment_id": f"{call_id}_{segment_counter}", 

578 } 

579 all_transcript_data.append(item) 

580 

581 df = pd.DataFrame(all_transcript_data) 

582 

583 if conditions: 

584 for condition in conditions: 

585 if not condition.applied and condition.column in df.columns: 

586 if condition.op == FilterOperator.EQUAL: 

587 df = df[df[condition.column] == condition.value] 

588 condition.applied = True 

589 elif condition.op == FilterOperator.LIKE or condition.op == FilterOperator.CONTAINS: 

590 if condition.column == "text": 

591 df = df[df[condition.column].str.contains(condition.value, case=False, na=False)] 

592 condition.applied = True 

593 

594 if sort: 

595 for col in sort: 

596 if col.column in df.columns: 

597 df = df.sort_values(by=col.column, ascending=col.ascending, na_position="last") 

598 col.applied = True 

599 break 

600 

601 if limit is not None: 

602 df = df.head(limit) 

603 return df 

604 

605 except Exception as e: 

606 logger.error(f"Error fetching transcripts from Gong API: {e}") 

607 raise 

608 

609 def get_columns(self) -> List[str]: 

610 """Returns the columns of the transcripts table""" 

611 return ["call_id", "speaker", "timestamp", "text", "confidence", "segment_id"]