Coverage for mindsdb / integrations / handlers / twelve_labs_handler / twelve_labs_api_client.py: 0%

145 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2import requests 

3from typing import Dict, List, Optional 

4from requests_toolbelt.multipart.encoder import MultipartEncoder 

5 

6from mindsdb.utilities import log 

7from mindsdb.integrations.handlers.twelve_labs_handler.settings import twelve_labs_handler_config 

8 

9 

10logger = log.getLogger(__name__) 

11 

12 

13class TwelveLabsAPIClient: 

14 """ 

15 The Twelve Labs API client for the Twelve Labs handler. 

16 This client is used for accessing the Twelve Labs API endpoints. 

17 """ 

18 

19 def __init__(self, api_key: str, base_url: str = None): 

20 """ 

21 The initializer for the TwelveLabsAPIClient. 

22 

23 Parameters 

24 ---------- 

25 api_key : str 

26 The Twelve Labs API key. 

27 base_url : str, Optional 

28 The base URL for the Twelve Labs API. Defaults to the base URL in the Twelve Labs handler settings. 

29 """ 

30 

31 self.api_key = api_key 

32 self.headers = { 

33 'Content-Type': 'application/json', 

34 'x-api-key': self.api_key 

35 } 

36 self.base_url = base_url if base_url else twelve_labs_handler_config.BASE_URL 

37 

38 def create_index(self, index_name: str, index_options: List[str], engine_id: Optional[str] = None, addons: Optional[List[str]] = None) -> str: 

39 """ 

40 Create an index. 

41 

42 Parameters 

43 ---------- 

44 index_name : str 

45 Name of the index to be created. 

46 

47 index_options : List[str] 

48 List of that specifies how the platform will process the videos uploaded to this index. 

49 

50 engine_id : str, Optional 

51 ID of the engine. If not provided, the default engine is used. 

52 

53 addons : List[str], Optional 

54 List of addons that should be enabled for the index. 

55 

56 Returns 

57 ------- 

58 str 

59 ID of the created index. 

60 """ 

61 

62 # TODO: change index_options to engine_options? 

63 # TODO: support multiple engines per index? 

64 body = { 

65 "index_name": index_name, 

66 "engines": [{ 

67 "engine_name": engine_id if engine_id else twelve_labs_handler_config.DEFAULT_ENGINE, 

68 "engine_options": index_options 

69 }], 

70 "addons": addons, 

71 } 

72 

73 result = self._submit_request( 

74 method="POST", 

75 endpoint="/indexes", 

76 data=body, 

77 ) 

78 

79 logger.info(f"Index {index_name} successfully created.") 

80 return result['_id'] 

81 

82 def get_index_by_name(self, index_name: str) -> str: 

83 """ 

84 Get an index by name. 

85 

86 Parameters 

87 ---------- 

88 index_name : str 

89 Name of the index to be retrieved. 

90 

91 Returns 

92 ------- 

93 str 

94 ID of the index. 

95 """ 

96 

97 params = { 

98 "index_name": index_name, 

99 } 

100 

101 result = self._submit_request( 

102 method="GET", 

103 endpoint="/indexes", 

104 data=params, 

105 ) 

106 

107 data = result['data'] 

108 return data[0]['_id'] if data else None 

109 

110 def list_videos_in_index(self, index_name: str) -> List[Dict]: 

111 """ 

112 List videos in an index. 

113 

114 Parameters 

115 ---------- 

116 index_name : str 

117 Name of the index. 

118 

119 Returns 

120 ------- 

121 List[Dict] 

122 List of videos in the index. 

123 """ 

124 

125 index_id = self.get_index_by_name(index_name=index_name) 

126 

127 data = [] 

128 result = self._submit_request( 

129 method="GET", 

130 endpoint=f"/indexes/{index_id}/videos", 

131 ) 

132 data.extend(result['data']) 

133 

134 while result['page_info']['page'] < result['page_info']['total_page']: 

135 result = self._submit_request( 

136 method="GET", 

137 endpoint=f"/indexes/{index_id}/videos?page_token={result['page_info']['next_page_token']}", 

138 ) 

139 data.extend(result['data']) 

140 

141 logger.info(f"Retrieved videos in index {index_id} successfully.") 

142 

143 return data 

144 

145 def _update_video_metadata(self, index_id: str, video_id: str, video_title: str = None, metadata: Dict = None) -> None: 

146 """ 

147 Update the metadata of a video that has already been indexed. 

148 

149 Parameters 

150 ---------- 

151 video_id : str 

152 ID of the video. 

153 

154 video_title : str 

155 Title of the video. 

156 

157 metadata : Dict, Optional 

158 Metadata to be updated. 

159 

160 Returns 

161 ------- 

162 None 

163 """ 

164 

165 body = {} 

166 

167 if video_title: 

168 body['video_title'] = video_title 

169 

170 if metadata: 

171 body['metadata'] = metadata 

172 

173 self._submit_request( 

174 method="PUT", 

175 endpoint=f"/indexes/{index_id}/videos/{video_id}", 

176 data=body, 

177 ) 

178 

179 logger.info(f"Updated metadata for video {video_id} successfully.") 

180 

181 def create_video_indexing_tasks(self, index_id: str, video_urls: List[str] = None, video_files: List[str] = None) -> List[str]: 

182 """ 

183 Create video indexing tasks. 

184 

185 Parameters 

186 ---------- 

187 index_id : str 

188 ID of the index. 

189 

190 video_urls : List[str], Optional 

191 List of video urls to be indexed. Either video_urls or video_files should be provided. This validation is handled by TwelveLabsHandlerModel. 

192 

193 video_files : List[str], Optional 

194 List of video files to be indexed. Either video_urls or video_files should be provided. This validation is handled by TwelveLabsHandlerModel. 

195 

196 Returns 

197 ------- 

198 List[str] 

199 List of task IDs created. 

200 """ 

201 

202 task_ids = [] 

203 

204 if video_urls: 

205 logger.info("video_urls has been set, therefore, it will be given precedence.") 

206 logger.info("Creating video indexing tasks for video urls.") 

207 

208 for video_url in video_urls: 

209 task_ids.append( 

210 self._create_video_indexing_task( 

211 index_id=index_id, 

212 video_url=video_url 

213 ) 

214 ) 

215 

216 elif video_files: 

217 logger.info("video_urls has not been set, therefore, video_files will be used.") 

218 logger.info("Creating video indexing tasks for video files.") 

219 for video_file in video_files: 

220 task_ids.append( 

221 self._create_video_indexing_task( 

222 index_id=index_id, 

223 video_file=video_file 

224 ) 

225 ) 

226 

227 return task_ids 

228 

229 def _create_video_indexing_task(self, index_id: str, video_url: str = None, video_file: str = None) -> str: 

230 """ 

231 Create a video indexing task. 

232 

233 Parameters 

234 ---------- 

235 index_id : str 

236 ID of the index. 

237 

238 video_url : str, Optional 

239 URL of the video to be indexed. Either video_url or video_file should be provided. This validation is handled by TwelveLabsHandlerModel. 

240 

241 video_file : str, Optional 

242 Path to the video file to be indexed. Either video_url or video_file should be provided. This validation is handled by TwelveLabsHandlerModel. 

243 

244 Returns 

245 ------- 

246 str 

247 ID of the created task. 

248 """ 

249 

250 body = { 

251 "index_id": index_id, 

252 } 

253 

254 file_to_close = None 

255 if video_url: 

256 body['video_url'] = video_url 

257 

258 elif video_file: 

259 import mimetypes 

260 # WE need the file open for the duration of the request. Maybe simplify it with context manager later, but needs _create_video_indexing_task re-written 

261 file_to_close = open(video_file, 'rb') 

262 mime_type, _ = mimetypes.guess_type(video_file) 

263 body['video_file'] = (file_to_close.name, file_to_close, mime_type) 

264 

265 result = self._submit_multi_part_request( 

266 method="POST", 

267 endpoint="/tasks", 

268 data=body, 

269 ) 

270 

271 if file_to_close: 

272 file_to_close.close() 

273 

274 task_id = result['_id'] 

275 logger.info(f"Created video indexing task {task_id} for {video_url if video_url else video_file} successfully.") 

276 

277 # update the video title 

278 video_reference = video_url if video_url else video_file 

279 task = self._get_video_indexing_task(task_id=task_id) 

280 self._update_video_metadata( 

281 index_id=index_id, 

282 video_id=task['video_id'], 

283 metadata={ 

284 "video_reference": video_reference 

285 } 

286 ) 

287 

288 return task_id 

289 

290 def poll_for_video_indexing_tasks(self, task_ids: List[str]) -> None: 

291 """ 

292 Poll for video indexing tasks to complete. 

293 

294 Parameters 

295 ---------- 

296 task_ids : List[str] 

297 List of task IDs to be polled. 

298 

299 Returns 

300 ------- 

301 None 

302 """ 

303 

304 for task_id in task_ids: 

305 logger.info(f"Polling status of video indexing task {task_id}.") 

306 is_task_running = True 

307 

308 while is_task_running: 

309 task = self._get_video_indexing_task(task_id=task_id) 

310 status = task['status'] 

311 logger.info(f"Task {task_id} is in the {status} state.") 

312 

313 wait_durtion = task['process']['remain_seconds'] if 'process' in task else twelve_labs_handler_config.DEFAULT_WAIT_DURATION 

314 

315 if status in ('pending', 'indexing', 'validating'): 

316 logger.info(f"Task {task_id} will be polled again in {wait_durtion} seconds.") 

317 time.sleep(wait_durtion) 

318 

319 elif status == 'ready': 

320 logger.info(f"Task {task_id} completed successfully.") 

321 is_task_running = False 

322 

323 else: 

324 logger.error(f"Task {task_id} failed with status {task['status']}.") 

325 # TODO: update Exception to be more specific 

326 raise Exception(f"Task {task_id} failed with status {task['status']}.") 

327 

328 logger.info("All videos indexed successffully.") 

329 

330 def _get_video_indexing_task(self, task_id: str) -> Dict: 

331 """ 

332 Get a video indexing task. 

333 

334 Parameters 

335 ---------- 

336 task_id : str 

337 ID of the task. 

338 

339 Returns 

340 ------- 

341 Dict 

342 Video indexing task. 

343 """ 

344 

345 result = self._submit_request( 

346 method="GET", 

347 endpoint=f"/tasks/{task_id}", 

348 ) 

349 

350 logger.info(f"Retrieved video indexing task {task_id} successfully.") 

351 return result 

352 

353 def search_index(self, index_id: str, query: str, search_options: List[str]) -> Dict: 

354 """ 

355 Search an index. 

356 

357 Parameters 

358 ---------- 

359 index_id : str 

360 ID of the index. 

361 

362 query : str 

363 Query to be searched. 

364 

365 search_options : List[str] 

366 List of search options to be used. 

367 

368 Returns 

369 ------- 

370 Dict 

371 Search results. 

372 """ 

373 

374 body = { 

375 "index_id": index_id, 

376 "query": query, 

377 "search_options": search_options 

378 } 

379 

380 data = [] 

381 result = self._submit_request( 

382 method="POST", 

383 endpoint="/search", 

384 data=body, 

385 ) 

386 data.extend(result['data']) 

387 

388 while 'next_page_token' in result['page_info']: 

389 result = self._submit_request( 

390 method="GET", 

391 endpoint=f"/search/{result['page_info']['next_page_token']}" 

392 ) 

393 data.extend(result['data']) 

394 

395 logger.info(f"Search for index {index_id} completed successfully.") 

396 return data 

397 

398 def summarize_videos(self, video_ids: List[str], summarization_type: str, prompt: str) -> Dict: 

399 """ 

400 Summarize videos. 

401 

402 Parameters 

403 ---------- 

404 video_ids : List[str] 

405 List of video IDs. 

406 

407 summarization_type : str 

408 Type of the summary to be generated. Supported types are 'summary', 'chapter' and 'highlight'. 

409 

410 prompt: str 

411 Prompt to be used for the Summarize task 

412 

413 Returns 

414 ------- 

415 Dict 

416 Summary of the videos. 

417 """ 

418 

419 results = [] 

420 results = [self.summarize_video(video_id, summarization_type, prompt) for video_id in video_ids] 

421 

422 logger.info(f"Summarized videos {video_ids} successfully.") 

423 return results 

424 

425 def summarize_video(self, video_id: str, summarization_type: str, prompt: str) -> Dict: 

426 """ 

427 Summarize a video. 

428 

429 Parameters 

430 ---------- 

431 video_id : str 

432 ID of the video. 

433 

434 summarization_type : str 

435 Type of the summary to be generated. Supported types are 'summary', 'chapter' and 'highlight'. 

436 

437 prompt: str 

438 Prompt to be used for the Summarize task 

439 

440 Returns 

441 ------- 

442 Dict 

443 Summary of the video. 

444 """ 

445 body = { 

446 "video_id": video_id, 

447 "type": summarization_type, 

448 "prompt": prompt 

449 } 

450 

451 result = self._submit_request( 

452 method="POST", 

453 endpoint="/summarize", 

454 data=body, 

455 ) 

456 

457 logger.info(f"Video {video_id} summarized successfully.") 

458 return result 

459 

460 def _submit_request(self, endpoint: str, headers: Dict = None, data: Dict = None, method: str = "GET") -> Dict: 

461 """ 

462 Submit a request to the Twelve Labs API. 

463 

464 Parameters 

465 ---------- 

466 endpoint : str 

467 API endpoint. 

468 

469 headers : Dict, Optional 

470 Headers to be used in the request. 

471 

472 data : Dict, Optional 

473 Data to be used in the request. 

474 

475 method : str, Optional 

476 HTTP method to be used in the request. Defaults to GET. 

477 

478 Returns 

479 ------- 

480 Dict 

481 Response from the API. 

482 """ 

483 

484 headers = headers if headers else self.headers 

485 

486 if method == "GET": 

487 response = requests.get( 

488 url=self.base_url + endpoint, 

489 headers=headers, 

490 params=data if data else {}, 

491 ) 

492 

493 elif method == "POST": 

494 response = requests.post( 

495 url=self.base_url + endpoint, 

496 headers=headers, 

497 json=data if data else {}, 

498 ) 

499 

500 elif method == "PUT": 

501 response = requests.put( 

502 url=self.base_url + endpoint, 

503 headers=headers, 

504 json=data if data else {}, 

505 ) 

506 

507 else: 

508 raise Exception(f"Method {method} not supported yet.") 

509 

510 return self._handle_response(response) 

511 

512 def _submit_multi_part_request(self, endpoint: str, headers: Dict = None, data: Dict = None, method: str = "POST") -> Dict: 

513 """ 

514 Submit a multi-part request to the Twelve Labs API. 

515 

516 Parameters 

517 ---------- 

518 endpoint : str 

519 API endpoint. 

520 

521 headers : Dict, Optional 

522 Headers to be used in the request. 

523 

524 data : Dict, Optional 

525 Data to be used in the request. 

526 

527 method : str, Optional 

528 HTTP method to be used in the request. Defaults to GET. 

529 

530 Returns 

531 ------- 

532 Dict 

533 Response from the API. 

534 """ 

535 

536 headers = headers = headers if headers else self.headers 

537 

538 multipart_data = MultipartEncoder(fields=data) 

539 headers['Content-Type'] = multipart_data.content_type 

540 if method == "POST": 

541 response = requests.post( 

542 url=self.base_url + endpoint, 

543 headers=headers, 

544 data=multipart_data if multipart_data else {} 

545 ) 

546 

547 else: 

548 raise Exception(f"Method {method} not supported yet.") 

549 return self._handle_response(response) 

550 

551 def _handle_response(self, response): 

552 if response.status_code in (200, 201): 

553 if response.content: 

554 result = response.json() 

555 logger.info("API request was successful.") 

556 return result 

557 else: 

558 logger.info("API request was successful. No content returned.") 

559 return {} 

560 else: 

561 if response.content: 

562 logger.error(f"API request has failed: {response.content}") 

563 # TODO: update Exception to be more specific 

564 raise Exception(f"API request has failed: {response.content}") 

565 else: 

566 logger.error("API request has failed. No content returned.") 

567 raise Exception("API request has failed. No content returned.")