Coverage for mindsdb / integrations / handlers / twelve_labs_handler / twelve_labs_api_client.py: 0%
145 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2import requests
3from typing import Dict, List, Optional
4from requests_toolbelt.multipart.encoder import MultipartEncoder
6from mindsdb.utilities import log
7from mindsdb.integrations.handlers.twelve_labs_handler.settings import twelve_labs_handler_config
10logger = log.getLogger(__name__)
13class TwelveLabsAPIClient:
14 """
15 The Twelve Labs API client for the Twelve Labs handler.
16 This client is used for accessing the Twelve Labs API endpoints.
17 """
19 def __init__(self, api_key: str, base_url: str = None):
20 """
21 The initializer for the TwelveLabsAPIClient.
23 Parameters
24 ----------
25 api_key : str
26 The Twelve Labs API key.
27 base_url : str, Optional
28 The base URL for the Twelve Labs API. Defaults to the base URL in the Twelve Labs handler settings.
29 """
31 self.api_key = api_key
32 self.headers = {
33 'Content-Type': 'application/json',
34 'x-api-key': self.api_key
35 }
36 self.base_url = base_url if base_url else twelve_labs_handler_config.BASE_URL
38 def create_index(self, index_name: str, index_options: List[str], engine_id: Optional[str] = None, addons: Optional[List[str]] = None) -> str:
39 """
40 Create an index.
42 Parameters
43 ----------
44 index_name : str
45 Name of the index to be created.
47 index_options : List[str]
48 List of that specifies how the platform will process the videos uploaded to this index.
50 engine_id : str, Optional
51 ID of the engine. If not provided, the default engine is used.
53 addons : List[str], Optional
54 List of addons that should be enabled for the index.
56 Returns
57 -------
58 str
59 ID of the created index.
60 """
62 # TODO: change index_options to engine_options?
63 # TODO: support multiple engines per index?
64 body = {
65 "index_name": index_name,
66 "engines": [{
67 "engine_name": engine_id if engine_id else twelve_labs_handler_config.DEFAULT_ENGINE,
68 "engine_options": index_options
69 }],
70 "addons": addons,
71 }
73 result = self._submit_request(
74 method="POST",
75 endpoint="/indexes",
76 data=body,
77 )
79 logger.info(f"Index {index_name} successfully created.")
80 return result['_id']
82 def get_index_by_name(self, index_name: str) -> str:
83 """
84 Get an index by name.
86 Parameters
87 ----------
88 index_name : str
89 Name of the index to be retrieved.
91 Returns
92 -------
93 str
94 ID of the index.
95 """
97 params = {
98 "index_name": index_name,
99 }
101 result = self._submit_request(
102 method="GET",
103 endpoint="/indexes",
104 data=params,
105 )
107 data = result['data']
108 return data[0]['_id'] if data else None
110 def list_videos_in_index(self, index_name: str) -> List[Dict]:
111 """
112 List videos in an index.
114 Parameters
115 ----------
116 index_name : str
117 Name of the index.
119 Returns
120 -------
121 List[Dict]
122 List of videos in the index.
123 """
125 index_id = self.get_index_by_name(index_name=index_name)
127 data = []
128 result = self._submit_request(
129 method="GET",
130 endpoint=f"/indexes/{index_id}/videos",
131 )
132 data.extend(result['data'])
134 while result['page_info']['page'] < result['page_info']['total_page']:
135 result = self._submit_request(
136 method="GET",
137 endpoint=f"/indexes/{index_id}/videos?page_token={result['page_info']['next_page_token']}",
138 )
139 data.extend(result['data'])
141 logger.info(f"Retrieved videos in index {index_id} successfully.")
143 return data
145 def _update_video_metadata(self, index_id: str, video_id: str, video_title: str = None, metadata: Dict = None) -> None:
146 """
147 Update the metadata of a video that has already been indexed.
149 Parameters
150 ----------
151 video_id : str
152 ID of the video.
154 video_title : str
155 Title of the video.
157 metadata : Dict, Optional
158 Metadata to be updated.
160 Returns
161 -------
162 None
163 """
165 body = {}
167 if video_title:
168 body['video_title'] = video_title
170 if metadata:
171 body['metadata'] = metadata
173 self._submit_request(
174 method="PUT",
175 endpoint=f"/indexes/{index_id}/videos/{video_id}",
176 data=body,
177 )
179 logger.info(f"Updated metadata for video {video_id} successfully.")
181 def create_video_indexing_tasks(self, index_id: str, video_urls: List[str] = None, video_files: List[str] = None) -> List[str]:
182 """
183 Create video indexing tasks.
185 Parameters
186 ----------
187 index_id : str
188 ID of the index.
190 video_urls : List[str], Optional
191 List of video urls to be indexed. Either video_urls or video_files should be provided. This validation is handled by TwelveLabsHandlerModel.
193 video_files : List[str], Optional
194 List of video files to be indexed. Either video_urls or video_files should be provided. This validation is handled by TwelveLabsHandlerModel.
196 Returns
197 -------
198 List[str]
199 List of task IDs created.
200 """
202 task_ids = []
204 if video_urls:
205 logger.info("video_urls has been set, therefore, it will be given precedence.")
206 logger.info("Creating video indexing tasks for video urls.")
208 for video_url in video_urls:
209 task_ids.append(
210 self._create_video_indexing_task(
211 index_id=index_id,
212 video_url=video_url
213 )
214 )
216 elif video_files:
217 logger.info("video_urls has not been set, therefore, video_files will be used.")
218 logger.info("Creating video indexing tasks for video files.")
219 for video_file in video_files:
220 task_ids.append(
221 self._create_video_indexing_task(
222 index_id=index_id,
223 video_file=video_file
224 )
225 )
227 return task_ids
229 def _create_video_indexing_task(self, index_id: str, video_url: str = None, video_file: str = None) -> str:
230 """
231 Create a video indexing task.
233 Parameters
234 ----------
235 index_id : str
236 ID of the index.
238 video_url : str, Optional
239 URL of the video to be indexed. Either video_url or video_file should be provided. This validation is handled by TwelveLabsHandlerModel.
241 video_file : str, Optional
242 Path to the video file to be indexed. Either video_url or video_file should be provided. This validation is handled by TwelveLabsHandlerModel.
244 Returns
245 -------
246 str
247 ID of the created task.
248 """
250 body = {
251 "index_id": index_id,
252 }
254 file_to_close = None
255 if video_url:
256 body['video_url'] = video_url
258 elif video_file:
259 import mimetypes
260 # WE need the file open for the duration of the request. Maybe simplify it with context manager later, but needs _create_video_indexing_task re-written
261 file_to_close = open(video_file, 'rb')
262 mime_type, _ = mimetypes.guess_type(video_file)
263 body['video_file'] = (file_to_close.name, file_to_close, mime_type)
265 result = self._submit_multi_part_request(
266 method="POST",
267 endpoint="/tasks",
268 data=body,
269 )
271 if file_to_close:
272 file_to_close.close()
274 task_id = result['_id']
275 logger.info(f"Created video indexing task {task_id} for {video_url if video_url else video_file} successfully.")
277 # update the video title
278 video_reference = video_url if video_url else video_file
279 task = self._get_video_indexing_task(task_id=task_id)
280 self._update_video_metadata(
281 index_id=index_id,
282 video_id=task['video_id'],
283 metadata={
284 "video_reference": video_reference
285 }
286 )
288 return task_id
290 def poll_for_video_indexing_tasks(self, task_ids: List[str]) -> None:
291 """
292 Poll for video indexing tasks to complete.
294 Parameters
295 ----------
296 task_ids : List[str]
297 List of task IDs to be polled.
299 Returns
300 -------
301 None
302 """
304 for task_id in task_ids:
305 logger.info(f"Polling status of video indexing task {task_id}.")
306 is_task_running = True
308 while is_task_running:
309 task = self._get_video_indexing_task(task_id=task_id)
310 status = task['status']
311 logger.info(f"Task {task_id} is in the {status} state.")
313 wait_durtion = task['process']['remain_seconds'] if 'process' in task else twelve_labs_handler_config.DEFAULT_WAIT_DURATION
315 if status in ('pending', 'indexing', 'validating'):
316 logger.info(f"Task {task_id} will be polled again in {wait_durtion} seconds.")
317 time.sleep(wait_durtion)
319 elif status == 'ready':
320 logger.info(f"Task {task_id} completed successfully.")
321 is_task_running = False
323 else:
324 logger.error(f"Task {task_id} failed with status {task['status']}.")
325 # TODO: update Exception to be more specific
326 raise Exception(f"Task {task_id} failed with status {task['status']}.")
328 logger.info("All videos indexed successffully.")
330 def _get_video_indexing_task(self, task_id: str) -> Dict:
331 """
332 Get a video indexing task.
334 Parameters
335 ----------
336 task_id : str
337 ID of the task.
339 Returns
340 -------
341 Dict
342 Video indexing task.
343 """
345 result = self._submit_request(
346 method="GET",
347 endpoint=f"/tasks/{task_id}",
348 )
350 logger.info(f"Retrieved video indexing task {task_id} successfully.")
351 return result
353 def search_index(self, index_id: str, query: str, search_options: List[str]) -> Dict:
354 """
355 Search an index.
357 Parameters
358 ----------
359 index_id : str
360 ID of the index.
362 query : str
363 Query to be searched.
365 search_options : List[str]
366 List of search options to be used.
368 Returns
369 -------
370 Dict
371 Search results.
372 """
374 body = {
375 "index_id": index_id,
376 "query": query,
377 "search_options": search_options
378 }
380 data = []
381 result = self._submit_request(
382 method="POST",
383 endpoint="/search",
384 data=body,
385 )
386 data.extend(result['data'])
388 while 'next_page_token' in result['page_info']:
389 result = self._submit_request(
390 method="GET",
391 endpoint=f"/search/{result['page_info']['next_page_token']}"
392 )
393 data.extend(result['data'])
395 logger.info(f"Search for index {index_id} completed successfully.")
396 return data
398 def summarize_videos(self, video_ids: List[str], summarization_type: str, prompt: str) -> Dict:
399 """
400 Summarize videos.
402 Parameters
403 ----------
404 video_ids : List[str]
405 List of video IDs.
407 summarization_type : str
408 Type of the summary to be generated. Supported types are 'summary', 'chapter' and 'highlight'.
410 prompt: str
411 Prompt to be used for the Summarize task
413 Returns
414 -------
415 Dict
416 Summary of the videos.
417 """
419 results = []
420 results = [self.summarize_video(video_id, summarization_type, prompt) for video_id in video_ids]
422 logger.info(f"Summarized videos {video_ids} successfully.")
423 return results
425 def summarize_video(self, video_id: str, summarization_type: str, prompt: str) -> Dict:
426 """
427 Summarize a video.
429 Parameters
430 ----------
431 video_id : str
432 ID of the video.
434 summarization_type : str
435 Type of the summary to be generated. Supported types are 'summary', 'chapter' and 'highlight'.
437 prompt: str
438 Prompt to be used for the Summarize task
440 Returns
441 -------
442 Dict
443 Summary of the video.
444 """
445 body = {
446 "video_id": video_id,
447 "type": summarization_type,
448 "prompt": prompt
449 }
451 result = self._submit_request(
452 method="POST",
453 endpoint="/summarize",
454 data=body,
455 )
457 logger.info(f"Video {video_id} summarized successfully.")
458 return result
460 def _submit_request(self, endpoint: str, headers: Dict = None, data: Dict = None, method: str = "GET") -> Dict:
461 """
462 Submit a request to the Twelve Labs API.
464 Parameters
465 ----------
466 endpoint : str
467 API endpoint.
469 headers : Dict, Optional
470 Headers to be used in the request.
472 data : Dict, Optional
473 Data to be used in the request.
475 method : str, Optional
476 HTTP method to be used in the request. Defaults to GET.
478 Returns
479 -------
480 Dict
481 Response from the API.
482 """
484 headers = headers if headers else self.headers
486 if method == "GET":
487 response = requests.get(
488 url=self.base_url + endpoint,
489 headers=headers,
490 params=data if data else {},
491 )
493 elif method == "POST":
494 response = requests.post(
495 url=self.base_url + endpoint,
496 headers=headers,
497 json=data if data else {},
498 )
500 elif method == "PUT":
501 response = requests.put(
502 url=self.base_url + endpoint,
503 headers=headers,
504 json=data if data else {},
505 )
507 else:
508 raise Exception(f"Method {method} not supported yet.")
510 return self._handle_response(response)
512 def _submit_multi_part_request(self, endpoint: str, headers: Dict = None, data: Dict = None, method: str = "POST") -> Dict:
513 """
514 Submit a multi-part request to the Twelve Labs API.
516 Parameters
517 ----------
518 endpoint : str
519 API endpoint.
521 headers : Dict, Optional
522 Headers to be used in the request.
524 data : Dict, Optional
525 Data to be used in the request.
527 method : str, Optional
528 HTTP method to be used in the request. Defaults to GET.
530 Returns
531 -------
532 Dict
533 Response from the API.
534 """
536 headers = headers = headers if headers else self.headers
538 multipart_data = MultipartEncoder(fields=data)
539 headers['Content-Type'] = multipart_data.content_type
540 if method == "POST":
541 response = requests.post(
542 url=self.base_url + endpoint,
543 headers=headers,
544 data=multipart_data if multipart_data else {}
545 )
547 else:
548 raise Exception(f"Method {method} not supported yet.")
549 return self._handle_response(response)
551 def _handle_response(self, response):
552 if response.status_code in (200, 201):
553 if response.content:
554 result = response.json()
555 logger.info("API request was successful.")
556 return result
557 else:
558 logger.info("API request was successful. No content returned.")
559 return {}
560 else:
561 if response.content:
562 logger.error(f"API request has failed: {response.content}")
563 # TODO: update Exception to be more specific
564 raise Exception(f"API request has failed: {response.content}")
565 else:
566 logger.error("API request has failed. No content returned.")
567 raise Exception("API request has failed. No content returned.")