Coverage for mindsdb / integrations / handlers / twelve_labs_handler / twelve_labs_handler.py: 0%
87 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
2from typing import Optional, Dict
4from mindsdb.utilities import log
5from mindsdb.integrations.libs.base import BaseMLEngine
6from mindsdb.integrations.utilities.handler_utils import get_api_key
7from mindsdb.integrations.libs.api_handler_exceptions import MissingConnectionParams
9from mindsdb.integrations.handlers.twelve_labs_handler.settings import TwelveLabsHandlerModel
10from mindsdb.integrations.handlers.twelve_labs_handler.twelve_labs_api_client import TwelveLabsAPIClient
13logger = log.getLogger(__name__)
16class TwelveLabsHandler(BaseMLEngine):
17 """
18 Twelve Labs API handler implementation.
19 """
21 name = 'twelve_labs'
23 def __init__(self, *args, **kwargs):
24 super().__init__(*args, **kwargs)
25 self.generative = True
27 @staticmethod
28 def create_validation(target: str, args: Dict = None, **kwargs: Dict) -> None:
29 """
30 Validates the create arguments. This method is called when creating a new model, prior to calling the create() method.
32 Parameters
33 ----------
34 target : str
35 Name of the target column.
37 args : Dict
38 Arguments from the USING clause.
40 kwargs : Dict
41 Additional arguments.
43 Raises
44 ------
45 MissingConnectionParams
46 If a USING clause is not provided.
48 ValueError
49 If the parameters in the USING clause are invalid.
50 """
52 # check for USING clause
53 if 'using' not in args:
54 raise MissingConnectionParams("Twelve Labs engine requires a USING clause! Refer to its documentation for more details.")
55 else:
56 # get USING args
57 args = args['using']
58 # pass args to TwelveLabsHandlerModel for validation
59 TwelveLabsHandlerModel(**args)
61 def create(self, target: str, df: Optional[pd.DataFrame] = None, args: Optional[Dict] = None) -> None:
62 """
63 Creates a model for for interacting with the Twelve Labs API. This method is called when creating a new model.
64 The following steps are performed:
65 1. Create an index if it doesn't exist already.
66 2. Create video indexing tasks for all video files or video urls.
67 3. Poll for video indexing tasks to complete.
69 Parameters
70 ----------
71 target : str
72 Name of the target column.
74 df : pd.DataFrame, Optional
75 DataFrame containing the data to be used in creating the model. This can include the columns containing video urls or video files.
77 args : Dict, Optional
78 Arguments from the USING clause.
79 """
81 # get USING args and add target
82 args = args['using']
83 args['target'] = target
85 # get api client and api key
86 twelve_labs_api_client, api_key = self._get_api_client(args)
88 # update args with api key
89 args['twelve_labs_api_key'] = api_key
91 # get index if it exists
92 index_id = twelve_labs_api_client.get_index_by_name(index_name=args['index_name'])
94 # create index if it doesn't exist
95 if not index_id:
96 logger.info(f"Index {args['index_name']} does not exist. Creating index.")
97 index_id = twelve_labs_api_client.create_index(
98 index_name=args['index_name'],
99 engine_id=args['engine_id'] if 'engine_id' in args else None,
100 index_options=args['index_options'],
101 addons=args['addons'] if 'addons' in args else []
102 )
104 else:
105 logger.info(f"Index {args['index_name']} already exists. Using existing index.")
107 # store index_id in args
108 args['index_id'] = index_id
110 # initialize video_urls and video_files
111 video_urls, video_files = None, None
113 # create video indexing tasks for all video files or video urls
114 # video urls will be given precedence
115 # check if video_urls_column has been set and use it to get the video urls
116 if 'video_urls_column' in args:
117 logger.info("video_urls_column has been set, therefore, it will be given precedence.")
118 video_urls = df[args['video_urls_column']].tolist()
120 # else, check if video_files_column has been set and use it to get the video files
121 elif 'video_files_column' in args:
122 logger.info("video_urls_column has not been set, therefore, video_files_column will be used.")
123 video_files = df[args['video_files_column']].tolist()
125 # else, check if video_urls or video_files have been set and use them
126 else:
127 logger.info("video_urls_column and video_files_column have not been set, therefore, video_urls and video_files will be used.")
128 video_urls = args['video_urls'] if 'video_urls' in args else None
129 video_files = args['video_files'] if 'video_files' in args else None
131 # if video_urls and video_files are not set, then raise an exception
132 if not video_urls and not video_files:
133 logger.error("Neither video_urls_column, video_files_column, video_urls nor video_files have been set.")
134 raise RuntimeError("Neither video_urls_column, video_files_column, video_urls nor video_files have been set. Please set one of them.")
136 task_ids = twelve_labs_api_client.create_video_indexing_tasks(
137 index_id=index_id,
138 video_urls=video_urls,
139 video_files=video_files,
140 )
142 # poll for video indexing tasks to complete
143 twelve_labs_api_client.poll_for_video_indexing_tasks(task_ids=task_ids)
145 # store args in model_storage
146 self.model_storage.json_set('args', args)
148 def predict(self, df: Optional[pd.DataFrame] = None, args: Optional[Dict] = None) -> None:
149 """
150 Predicts the target column for the given data. This method is called when making predictions.
152 Parameters
153 ----------
154 df : pd.DataFrame, Optional
155 DataFrame containing the data to be used in making predictions. This can include the column containing the queries to be run against the index.
157 args : Dict, Optional
158 Additional arguments.
160 """
162 # get args from model_storage
163 args = self.model_storage.json_get('args')
165 # get api client
166 twelve_labs_api_client, _ = self._get_api_client(args)
168 # check if task is search
169 if args['task'] == 'search':
170 # get search query
171 # TODO: support multiple queries
172 query = df[args['search_query_column']].tolist()[0]
174 # search for query in index
175 data = twelve_labs_api_client.search_index(
176 index_id=args['index_id'],
177 query=query,
178 search_options=args['search_options']
179 )
181 # TODO: pick only the necessary columns?
182 # TODO: structure nested columns?
183 # metadata = ['score', 'start', 'end', 'video_id', 'confidence']
184 # df_metadata = pd.json_normalize(data, record_path='metadata', meta=metadata, record_prefix='metadata_')
185 # df_modules = pd.json_normalize(data, record_path='modules', meta=metadata, record_prefix='modules_')
186 # df_predictions = pd.merge(df_metadata, df_modules, on=metadata)
187 # return df_predictions
188 return pd.json_normalize(data).add_prefix(args['target'] + '_')
190 # check if task is summarize
191 elif args['task'] == 'summarization':
192 # sumarize videos
193 video_ids = df['video_id'].tolist()
194 data = twelve_labs_api_client.summarize_videos(
195 video_ids=video_ids,
196 summarization_type=args['summarization_type'],
197 prompt=args['prompt']
198 )
200 if args['summarization_type'] in ('chapter', 'highlight'):
201 return pd.json_normalize(data, record_path=f"{args['summarization_type']}s", meta=['id']).add_prefix(args['target'] + '_')
202 else:
203 return pd.json_normalize(data).add_prefix(args['target'] + '_')
205 def describe(self, attribute: Optional[str] = None) -> pd.DataFrame:
206 """
207 Describes the model. This method is called when describing the model.
209 Parameters
210 ----------
211 attribute : str, Optional
212 The attribute to describe.
214 Returns
215 -------
216 pd.DataFrame
217 DataFrame containing the description of the model.
218 """
220 if attribute == "args":
221 args = self.model_storage.json_get("args")
222 return pd.DataFrame(args.items(), columns=["key", "value"])
224 elif attribute == "indexed_videos":
225 # get api client
226 twelve_labs_api_client, _ = self._get_api_client()
228 # get videos indexed in the index
229 index_name = self.model_storage.json_get("args").get("index_name")
230 indexed_videos = twelve_labs_api_client.list_videos_in_index(index_name=index_name)
232 # structure nested columns
233 indexed_video_data = []
234 for video in indexed_videos:
235 video_data = video.copy()
236 video_data.pop("metadata")
237 video_data.update(video["metadata"])
239 # convert engine_ids to string
240 video_data['engine_ids'] = ", ".join(video_data['engine_ids'])
242 indexed_video_data.append(video_data)
244 df_videos = pd.DataFrame(indexed_video_data)
246 # rename _id to video_id
247 df_videos.rename(columns={"_id": "video_id"}, inplace=True)
249 # MindsDB GUI fails to display NaN values, so we replace them with 0
250 df_videos.fillna(0, inplace=True)
251 return df_videos
253 else:
254 tables = ["args", "indexed_videos"]
255 return pd.DataFrame(tables, columns=["tables"])
257 def _get_api_client(self, args: Dict = None) -> TwelveLabsAPIClient:
258 """
259 Returns a TwelveLabsAPIClient instance.
261 Parameters
262 ----------
263 args : Dict
264 Arguments from the USING clause.
266 Returns
267 -------
268 TwelveLabsAPIClient
269 TwelveLabsAPIClient instance.
270 """
272 if not args:
273 args = self.model_storage.json_get('args')
275 # get api key
276 api_key = get_api_key(
277 api_name=self.name,
278 create_args=args,
279 engine_storage=self.engine_storage,
280 )
282 base_url = args.get('base_url', None)
284 # initialize TwelveLabsAPIClient
285 return TwelveLabsAPIClient(api_key=api_key, base_url=base_url), api_key