Coverage for mindsdb / integrations / handlers / symbl_handler / symbl_tables.py: 0%

196 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2import json 

3import symbl 

4from typing import List 

5from mindsdb.integrations.libs.api_handler import APITable 

6from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor 

7from mindsdb.utilities import log 

8from mindsdb_sql_parser import ast 

9 

10logger = log.getLogger(__name__) 

11 

12 

13class GetConversationTable(APITable): 

14 """The Get Conversation Table implementation""" 

15 

16 def select(self, query: ast.Select) -> pd.DataFrame: 

17 """Get the conversation Id for the given audio file" API 

18 

19 Parameters 

20 ---------- 

21 query : ast.Select 

22 Given SQL SELECT query 

23 

24 Returns 

25 ------- 

26 pd.DataFrame 

27 conversation id 

28 

29 Raises 

30 ------ 

31 ValueError 

32 If the query contains an unsupported condition 

33 """ 

34 

35 select_statement_parser = SELECTQueryParser( 

36 query, 

37 'get_conversation_id', 

38 self.get_columns() 

39 ) 

40 

41 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

42 

43 search_params = {} 

44 subset_where_conditions = [] 

45 for op, arg1, arg2 in where_conditions: 

46 if arg1 == 'audio_url': 

47 if op == '=': 

48 search_params["audio_url"] = arg2 

49 else: 

50 raise NotImplementedError("Only '=' operator is supported for audio_url column.") 

51 

52 elif arg1 in self.get_columns(): 

53 subset_where_conditions.append([op, arg1, arg2]) 

54 

55 filter_flag = ("audio_url" in search_params) 

56 

57 if not filter_flag: 

58 raise NotImplementedError("audio_url column has to be present in where clause.") 

59 

60 df = pd.DataFrame(columns=self.get_columns()) 

61 

62 payload = {"url": search_params.get("audio_url")} 

63 conversation_object = symbl.Audio.process_url(payload=payload, credentials=self.handler.credentials) 

64 

65 df = pd.json_normalize({"conversation_id": conversation_object.get_conversation_id()}) 

66 

67 select_statement_executor = SELECTQueryExecutor( 

68 df, 

69 selected_columns, 

70 subset_where_conditions, 

71 order_by_conditions, 

72 result_limit 

73 ) 

74 

75 df = select_statement_executor.execute_query() 

76 

77 return df 

78 

79 def get_columns(self) -> List[str]: 

80 """Gets all columns to be returned in pandas DataFrame responses 

81 

82 Returns 

83 ------- 

84 List[str] 

85 List of columns 

86 """ 

87 

88 return [ 

89 "conversation_id" 

90 ] 

91 

92 

93class GetMessagesTable(APITable): 

94 """The Get Messages Table implementation""" 

95 

96 def select(self, query: ast.Select) -> pd.DataFrame: 

97 """Get the messages for the given conversation id" API 

98 

99 Parameters 

100 ---------- 

101 query : ast.Select 

102 Given SQL SELECT query 

103 

104 Returns 

105 ------- 

106 pd.DataFrame 

107 Messages 

108 

109 Raises 

110 ------ 

111 ValueError 

112 If the query contains an unsupported condition 

113 """ 

114 

115 select_statement_parser = SELECTQueryParser( 

116 query, 

117 'get_messages', 

118 self.get_columns() 

119 ) 

120 

121 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

122 

123 search_params = {} 

124 subset_where_conditions = [] 

125 for op, arg1, arg2 in where_conditions: 

126 if arg1 == 'conversation_id': 

127 if op == '=': 

128 search_params["conversation_id"] = arg2 

129 else: 

130 raise NotImplementedError("Only '=' operator is supported for conversation_id column.") 

131 

132 elif arg1 in self.get_columns(): 

133 subset_where_conditions.append([op, arg1, arg2]) 

134 

135 filter_flag = ("conversation_id" in search_params) 

136 

137 if not filter_flag: 

138 raise NotImplementedError("conversation_id column has to be present in where clause.") 

139 

140 df = pd.DataFrame(columns=self.get_columns()) 

141 

142 resp = symbl.Conversations.get_messages(conversation_id=search_params.get("conversation_id"), credentials=self.handler.credentials) 

143 

144 resp = self.parse_response(resp) 

145 

146 df = pd.json_normalize(resp["messages"]) 

147 

148 select_statement_executor = SELECTQueryExecutor( 

149 df, 

150 selected_columns, 

151 subset_where_conditions, 

152 order_by_conditions, 

153 result_limit 

154 ) 

155 

156 df = select_statement_executor.execute_query() 

157 

158 return df 

159 

160 def parse_response(self, res): 

161 return json.loads(json.dumps(res.to_dict(), default=str)) 

162 

163 def get_columns(self) -> List[str]: 

164 """Gets all columns to be returned in pandas DataFrame responses 

165 

166 Returns 

167 ------- 

168 List[str] 

169 List of columns 

170 """ 

171 

172 return [ 

173 "conversation_id", 

174 "end_time", 

175 "id", 

176 "phrases", 

177 "sentiment", 

178 "start_time", 

179 "text", 

180 "words", 

181 "_from.email", 

182 "_from.id", 

183 "_from.name" 

184 ] 

185 

186 

187class GetTopicsTable(APITable): 

188 """The Get Topics Table implementation""" 

189 

190 def select(self, query: ast.Select) -> pd.DataFrame: 

191 """Get the topics for the given conversation id" API 

192 

193 Parameters 

194 ---------- 

195 query : ast.Select 

196 Given SQL SELECT query 

197 

198 Returns 

199 ------- 

200 pd.DataFrame 

201 Topics 

202 

203 Raises 

204 ------ 

205 ValueError 

206 If the query contains an unsupported condition 

207 """ 

208 

209 select_statement_parser = SELECTQueryParser( 

210 query, 

211 'get_topics', 

212 self.get_columns() 

213 ) 

214 

215 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

216 

217 search_params = {} 

218 subset_where_conditions = [] 

219 for op, arg1, arg2 in where_conditions: 

220 if arg1 == 'conversation_id': 

221 if op == '=': 

222 search_params["conversation_id"] = arg2 

223 else: 

224 raise NotImplementedError("Only '=' operator is supported for conversation_id column.") 

225 

226 elif arg1 in self.get_columns(): 

227 subset_where_conditions.append([op, arg1, arg2]) 

228 

229 filter_flag = ("conversation_id" in search_params) 

230 

231 if not filter_flag: 

232 raise NotImplementedError("conversation_id column has to be present in where clause.") 

233 

234 df = pd.DataFrame(columns=self.get_columns()) 

235 

236 resp = symbl.Conversations.get_topics(conversation_id=search_params.get("conversation_id"), credentials=self.handler.credentials) 

237 

238 resp = self.parse_response(resp) 

239 

240 df = pd.json_normalize(resp["topics"]) 

241 

242 select_statement_executor = SELECTQueryExecutor( 

243 df, 

244 selected_columns, 

245 subset_where_conditions, 

246 order_by_conditions, 

247 result_limit 

248 ) 

249 

250 df = select_statement_executor.execute_query() 

251 

252 return df 

253 

254 def parse_response(self, res): 

255 return json.loads(json.dumps(res.to_dict(), default=str)) 

256 

257 def get_columns(self) -> List[str]: 

258 """Gets all columns to be returned in pandas DataFrame responses 

259 

260 Returns 

261 ------- 

262 List[str] 

263 List of columns 

264 """ 

265 

266 return [ 

267 "id", 

268 "text", 

269 "type", 

270 "score", 

271 "message_ids", 

272 "entities", 

273 "sentiment", 

274 "parent_refs" 

275 ] 

276 

277 

278class GetQuestionsTable(APITable): 

279 """The Get Questions Table implementation""" 

280 

281 def select(self, query: ast.Select) -> pd.DataFrame: 

282 """Get the questions for the given conversation id" API 

283 

284 Parameters 

285 ---------- 

286 query : ast.Select 

287 Given SQL SELECT query 

288 

289 Returns 

290 ------- 

291 pd.DataFrame 

292 Questions 

293 

294 Raises 

295 ------ 

296 ValueError 

297 If the query contains an unsupported condition 

298 """ 

299 

300 select_statement_parser = SELECTQueryParser( 

301 query, 

302 'get_questions', 

303 self.get_columns() 

304 ) 

305 

306 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

307 

308 search_params = {} 

309 subset_where_conditions = [] 

310 for op, arg1, arg2 in where_conditions: 

311 if arg1 == 'conversation_id': 

312 if op == '=': 

313 search_params["conversation_id"] = arg2 

314 else: 

315 raise NotImplementedError("Only '=' operator is supported for conversation_id column.") 

316 

317 elif arg1 in self.get_columns(): 

318 subset_where_conditions.append([op, arg1, arg2]) 

319 

320 filter_flag = ("conversation_id" in search_params) 

321 

322 if not filter_flag: 

323 raise NotImplementedError("conversation_id column has to be present in where clause.") 

324 

325 df = pd.DataFrame(columns=self.get_columns()) 

326 

327 resp = symbl.Conversations.get_questions(conversation_id=search_params.get("conversation_id"), credentials=self.handler.credentials) 

328 

329 resp = self.parse_response(resp) 

330 

331 df = pd.json_normalize(resp["questions"]) 

332 

333 select_statement_executor = SELECTQueryExecutor( 

334 df, 

335 selected_columns, 

336 subset_where_conditions, 

337 order_by_conditions, 

338 result_limit 

339 ) 

340 

341 df = select_statement_executor.execute_query() 

342 

343 return df 

344 

345 def parse_response(self, res): 

346 return json.loads(json.dumps(res.to_dict(), default=str)) 

347 

348 def get_columns(self) -> List[str]: 

349 """Gets all columns to be returned in pandas DataFrame responses 

350 

351 Returns 

352 ------- 

353 List[str] 

354 List of columns 

355 """ 

356 

357 return [ 

358 "id", 

359 "text", 

360 "type", 

361 "score", 

362 "message_ids", 

363 "_from.id", 

364 "_from.name", 

365 "_from.user_id" 

366 ] 

367 

368 

369class GetFollowUpsTable(APITable): 

370 """The Get FollowUps Table implementation""" 

371 

372 def select(self, query: ast.Select) -> pd.DataFrame: 

373 """Get the follow ups for the given conversation id" API 

374 

375 Parameters 

376 ---------- 

377 query : ast.Select 

378 Given SQL SELECT query 

379 

380 Returns 

381 ------- 

382 pd.DataFrame 

383 follow up Questions 

384 

385 Raises 

386 ------ 

387 ValueError 

388 If the query contains an unsupported condition 

389 """ 

390 

391 select_statement_parser = SELECTQueryParser( 

392 query, 

393 'get_follow_ups', 

394 self.get_columns() 

395 ) 

396 

397 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

398 

399 search_params = {} 

400 subset_where_conditions = [] 

401 for op, arg1, arg2 in where_conditions: 

402 if arg1 == 'conversation_id': 

403 if op == '=': 

404 search_params["conversation_id"] = arg2 

405 else: 

406 raise NotImplementedError("Only '=' operator is supported for conversation_id column.") 

407 

408 elif arg1 in self.get_columns(): 

409 subset_where_conditions.append([op, arg1, arg2]) 

410 

411 filter_flag = ("conversation_id" in search_params) 

412 

413 if not filter_flag: 

414 raise NotImplementedError("conversation_id column has to be present in where clause.") 

415 

416 df = pd.DataFrame(columns=self.get_columns()) 

417 

418 resp = symbl.Conversations.get_follow_ups(conversation_id=search_params.get("conversation_id"), credentials=self.handler.credentials) 

419 

420 resp = self.parse_response(resp) 

421 

422 df = pd.json_normalize(resp["follow_ups"]) 

423 

424 select_statement_executor = SELECTQueryExecutor( 

425 df, 

426 selected_columns, 

427 subset_where_conditions, 

428 order_by_conditions, 

429 result_limit 

430 ) 

431 

432 df = select_statement_executor.execute_query() 

433 

434 return df 

435 

436 def parse_response(self, res): 

437 return json.loads(json.dumps(res.to_dict(), default=str)) 

438 

439 def get_columns(self) -> List[str]: 

440 """Gets all columns to be returned in pandas DataFrame responses 

441 

442 Returns 

443 ------- 

444 List[str] 

445 List of columns 

446 """ 

447 

448 return [ 

449 "id", 

450 "text", 

451 "type", 

452 "score", 

453 "message_ids", 

454 "entities", 

455 "phrases", 

456 "definitive", 

457 "due_by", 

458 "_from.id", 

459 "_from.name", 

460 "_from.user_id", 

461 "assignee.id", 

462 "assignee.name", 

463 "assignee.email" 

464 ] 

465 

466 

467class GetActionItemsTable(APITable): 

468 """The Get Action items Table implementation""" 

469 

470 def select(self, query: ast.Select) -> pd.DataFrame: 

471 """Get the action items for the given conversation id" API 

472 

473 Parameters 

474 ---------- 

475 query : ast.Select 

476 Given SQL SELECT query 

477 

478 Returns 

479 ------- 

480 pd.DataFrame 

481 action items 

482 

483 Raises 

484 ------ 

485 ValueError 

486 If the query contains an unsupported condition 

487 """ 

488 

489 select_statement_parser = SELECTQueryParser( 

490 query, 

491 'get_action_items', 

492 self.get_columns() 

493 ) 

494 

495 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

496 

497 search_params = {} 

498 subset_where_conditions = [] 

499 for op, arg1, arg2 in where_conditions: 

500 if arg1 == 'conversation_id': 

501 if op == '=': 

502 search_params["conversation_id"] = arg2 

503 else: 

504 raise NotImplementedError("Only '=' operator is supported for conversation_id column.") 

505 

506 elif arg1 in self.get_columns(): 

507 subset_where_conditions.append([op, arg1, arg2]) 

508 

509 filter_flag = ("conversation_id" in search_params) 

510 

511 if not filter_flag: 

512 raise NotImplementedError("conversation_id column has to be present in where clause.") 

513 

514 df = pd.DataFrame(columns=self.get_columns()) 

515 

516 resp = symbl.Conversations.get_action_items(conversation_id=search_params.get("conversation_id"), credentials=self.handler.credentials) 

517 

518 resp = self.parse_response(resp) 

519 

520 df = pd.json_normalize(resp["action_items"]) 

521 

522 select_statement_executor = SELECTQueryExecutor( 

523 df, 

524 selected_columns, 

525 subset_where_conditions, 

526 order_by_conditions, 

527 result_limit 

528 ) 

529 

530 df = select_statement_executor.execute_query() 

531 

532 return df 

533 

534 def parse_response(self, res): 

535 return json.loads(json.dumps(res.to_dict(), default=str)) 

536 

537 def get_columns(self) -> List[str]: 

538 """Gets all columns to be returned in pandas DataFrame responses 

539 

540 Returns 

541 ------- 

542 List[str] 

543 List of columns 

544 """ 

545 

546 return [ 

547 "id", 

548 "text", 

549 "type", 

550 "score", 

551 "message_ids", 

552 "entities", 

553 "phrases", 

554 "definitive", 

555 "due_by", 

556 "_from.id", 

557 "_from.name", 

558 "_from.user_id", 

559 "assignee.id", 

560 "assignee.name", 

561 "assignee.email" 

562 ] 

563 

564 

565class GetAnalyticsTable(APITable): 

566 """The Get Analytics Table implementation""" 

567 

568 def select(self, query: ast.Select) -> pd.DataFrame: 

569 """Get the analytics for the given conversation id" API 

570 

571 Parameters 

572 ---------- 

573 query : ast.Select 

574 Given SQL SELECT query 

575 

576 Returns 

577 ------- 

578 pd.DataFrame 

579 metrics 

580 

581 Raises 

582 ------ 

583 ValueError 

584 If the query contains an unsupported condition 

585 """ 

586 

587 select_statement_parser = SELECTQueryParser( 

588 query, 

589 'get_analytics', 

590 self.get_columns() 

591 ) 

592 

593 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

594 

595 search_params = {} 

596 subset_where_conditions = [] 

597 for op, arg1, arg2 in where_conditions: 

598 if arg1 == 'conversation_id': 

599 if op == '=': 

600 search_params["conversation_id"] = arg2 

601 else: 

602 raise NotImplementedError("Only '=' operator is supported for conversation_id column.") 

603 

604 elif arg1 in self.get_columns(): 

605 subset_where_conditions.append([op, arg1, arg2]) 

606 

607 filter_flag = ("conversation_id" in search_params) 

608 

609 if not filter_flag: 

610 raise NotImplementedError("conversation_id column has to be present in where clause.") 

611 

612 df = pd.DataFrame(columns=self.get_columns()) 

613 

614 resp = symbl.Conversations.get_analytics(conversation_id=search_params.get("conversation_id"), credentials=self.handler.credentials) 

615 

616 resp = self.parse_response(resp) 

617 

618 df = pd.json_normalize(resp["metrics"]) 

619 

620 select_statement_executor = SELECTQueryExecutor( 

621 df, 

622 selected_columns, 

623 subset_where_conditions, 

624 order_by_conditions, 

625 result_limit 

626 ) 

627 

628 df = select_statement_executor.execute_query() 

629 

630 return df 

631 

632 def parse_response(self, res): 

633 return json.loads(json.dumps(res.to_dict(), default=str)) 

634 

635 def get_columns(self) -> List[str]: 

636 """Gets all columns to be returned in pandas DataFrame responses 

637 

638 Returns 

639 ------- 

640 List[str] 

641 List of columns 

642 """ 

643 

644 return [ 

645 "type", 

646 "percent", 

647 "seconds" 

648 ]