Coverage for mindsdb / integrations / handlers / twelve_labs_handler / twelve_labs_handler.py: 0%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2from typing import Optional, Dict 

3 

4from mindsdb.utilities import log 

5from mindsdb.integrations.libs.base import BaseMLEngine 

6from mindsdb.integrations.utilities.handler_utils import get_api_key 

7from mindsdb.integrations.libs.api_handler_exceptions import MissingConnectionParams 

8 

9from mindsdb.integrations.handlers.twelve_labs_handler.settings import TwelveLabsHandlerModel 

10from mindsdb.integrations.handlers.twelve_labs_handler.twelve_labs_api_client import TwelveLabsAPIClient 

11 

12 

13logger = log.getLogger(__name__) 

14 

15 

16class TwelveLabsHandler(BaseMLEngine): 

17 """ 

18 Twelve Labs API handler implementation. 

19 """ 

20 

21 name = 'twelve_labs' 

22 

23 def __init__(self, *args, **kwargs): 

24 super().__init__(*args, **kwargs) 

25 self.generative = True 

26 

27 @staticmethod 

28 def create_validation(target: str, args: Dict = None, **kwargs: Dict) -> None: 

29 """ 

30 Validates the create arguments. This method is called when creating a new model, prior to calling the create() method. 

31 

32 Parameters 

33 ---------- 

34 target : str 

35 Name of the target column. 

36 

37 args : Dict 

38 Arguments from the USING clause. 

39 

40 kwargs : Dict 

41 Additional arguments. 

42 

43 Raises 

44 ------ 

45 MissingConnectionParams 

46 If a USING clause is not provided. 

47 

48 ValueError 

49 If the parameters in the USING clause are invalid. 

50 """ 

51 

52 # check for USING clause 

53 if 'using' not in args: 

54 raise MissingConnectionParams("Twelve Labs engine requires a USING clause! Refer to its documentation for more details.") 

55 else: 

56 # get USING args 

57 args = args['using'] 

58 # pass args to TwelveLabsHandlerModel for validation 

59 TwelveLabsHandlerModel(**args) 

60 

61 def create(self, target: str, df: Optional[pd.DataFrame] = None, args: Optional[Dict] = None) -> None: 

62 """ 

63 Creates a model for for interacting with the Twelve Labs API. This method is called when creating a new model. 

64 The following steps are performed: 

65 1. Create an index if it doesn't exist already. 

66 2. Create video indexing tasks for all video files or video urls. 

67 3. Poll for video indexing tasks to complete. 

68 

69 Parameters 

70 ---------- 

71 target : str 

72 Name of the target column. 

73 

74 df : pd.DataFrame, Optional 

75 DataFrame containing the data to be used in creating the model. This can include the columns containing video urls or video files. 

76 

77 args : Dict, Optional 

78 Arguments from the USING clause. 

79 """ 

80 

81 # get USING args and add target 

82 args = args['using'] 

83 args['target'] = target 

84 

85 # get api client and api key 

86 twelve_labs_api_client, api_key = self._get_api_client(args) 

87 

88 # update args with api key 

89 args['twelve_labs_api_key'] = api_key 

90 

91 # get index if it exists 

92 index_id = twelve_labs_api_client.get_index_by_name(index_name=args['index_name']) 

93 

94 # create index if it doesn't exist 

95 if not index_id: 

96 logger.info(f"Index {args['index_name']} does not exist. Creating index.") 

97 index_id = twelve_labs_api_client.create_index( 

98 index_name=args['index_name'], 

99 engine_id=args['engine_id'] if 'engine_id' in args else None, 

100 index_options=args['index_options'], 

101 addons=args['addons'] if 'addons' in args else [] 

102 ) 

103 

104 else: 

105 logger.info(f"Index {args['index_name']} already exists. Using existing index.") 

106 

107 # store index_id in args 

108 args['index_id'] = index_id 

109 

110 # initialize video_urls and video_files 

111 video_urls, video_files = None, None 

112 

113 # create video indexing tasks for all video files or video urls 

114 # video urls will be given precedence 

115 # check if video_urls_column has been set and use it to get the video urls 

116 if 'video_urls_column' in args: 

117 logger.info("video_urls_column has been set, therefore, it will be given precedence.") 

118 video_urls = df[args['video_urls_column']].tolist() 

119 

120 # else, check if video_files_column has been set and use it to get the video files 

121 elif 'video_files_column' in args: 

122 logger.info("video_urls_column has not been set, therefore, video_files_column will be used.") 

123 video_files = df[args['video_files_column']].tolist() 

124 

125 # else, check if video_urls or video_files have been set and use them 

126 else: 

127 logger.info("video_urls_column and video_files_column have not been set, therefore, video_urls and video_files will be used.") 

128 video_urls = args['video_urls'] if 'video_urls' in args else None 

129 video_files = args['video_files'] if 'video_files' in args else None 

130 

131 # if video_urls and video_files are not set, then raise an exception 

132 if not video_urls and not video_files: 

133 logger.error("Neither video_urls_column, video_files_column, video_urls nor video_files have been set.") 

134 raise RuntimeError("Neither video_urls_column, video_files_column, video_urls nor video_files have been set. Please set one of them.") 

135 

136 task_ids = twelve_labs_api_client.create_video_indexing_tasks( 

137 index_id=index_id, 

138 video_urls=video_urls, 

139 video_files=video_files, 

140 ) 

141 

142 # poll for video indexing tasks to complete 

143 twelve_labs_api_client.poll_for_video_indexing_tasks(task_ids=task_ids) 

144 

145 # store args in model_storage 

146 self.model_storage.json_set('args', args) 

147 

148 def predict(self, df: Optional[pd.DataFrame] = None, args: Optional[Dict] = None) -> None: 

149 """ 

150 Predicts the target column for the given data. This method is called when making predictions. 

151 

152 Parameters 

153 ---------- 

154 df : pd.DataFrame, Optional 

155 DataFrame containing the data to be used in making predictions. This can include the column containing the queries to be run against the index. 

156 

157 args : Dict, Optional 

158 Additional arguments. 

159 

160 """ 

161 

162 # get args from model_storage 

163 args = self.model_storage.json_get('args') 

164 

165 # get api client 

166 twelve_labs_api_client, _ = self._get_api_client(args) 

167 

168 # check if task is search 

169 if args['task'] == 'search': 

170 # get search query 

171 # TODO: support multiple queries 

172 query = df[args['search_query_column']].tolist()[0] 

173 

174 # search for query in index 

175 data = twelve_labs_api_client.search_index( 

176 index_id=args['index_id'], 

177 query=query, 

178 search_options=args['search_options'] 

179 ) 

180 

181 # TODO: pick only the necessary columns? 

182 # TODO: structure nested columns? 

183 # metadata = ['score', 'start', 'end', 'video_id', 'confidence'] 

184 # df_metadata = pd.json_normalize(data, record_path='metadata', meta=metadata, record_prefix='metadata_') 

185 # df_modules = pd.json_normalize(data, record_path='modules', meta=metadata, record_prefix='modules_') 

186 # df_predictions = pd.merge(df_metadata, df_modules, on=metadata) 

187 # return df_predictions 

188 return pd.json_normalize(data).add_prefix(args['target'] + '_') 

189 

190 # check if task is summarize 

191 elif args['task'] == 'summarization': 

192 # sumarize videos 

193 video_ids = df['video_id'].tolist() 

194 data = twelve_labs_api_client.summarize_videos( 

195 video_ids=video_ids, 

196 summarization_type=args['summarization_type'], 

197 prompt=args['prompt'] 

198 ) 

199 

200 if args['summarization_type'] in ('chapter', 'highlight'): 

201 return pd.json_normalize(data, record_path=f"{args['summarization_type']}s", meta=['id']).add_prefix(args['target'] + '_') 

202 else: 

203 return pd.json_normalize(data).add_prefix(args['target'] + '_') 

204 

205 def describe(self, attribute: Optional[str] = None) -> pd.DataFrame: 

206 """ 

207 Describes the model. This method is called when describing the model. 

208 

209 Parameters 

210 ---------- 

211 attribute : str, Optional 

212 The attribute to describe. 

213 

214 Returns 

215 ------- 

216 pd.DataFrame 

217 DataFrame containing the description of the model. 

218 """ 

219 

220 if attribute == "args": 

221 args = self.model_storage.json_get("args") 

222 return pd.DataFrame(args.items(), columns=["key", "value"]) 

223 

224 elif attribute == "indexed_videos": 

225 # get api client 

226 twelve_labs_api_client, _ = self._get_api_client() 

227 

228 # get videos indexed in the index 

229 index_name = self.model_storage.json_get("args").get("index_name") 

230 indexed_videos = twelve_labs_api_client.list_videos_in_index(index_name=index_name) 

231 

232 # structure nested columns 

233 indexed_video_data = [] 

234 for video in indexed_videos: 

235 video_data = video.copy() 

236 video_data.pop("metadata") 

237 video_data.update(video["metadata"]) 

238 

239 # convert engine_ids to string 

240 video_data['engine_ids'] = ", ".join(video_data['engine_ids']) 

241 

242 indexed_video_data.append(video_data) 

243 

244 df_videos = pd.DataFrame(indexed_video_data) 

245 

246 # rename _id to video_id 

247 df_videos.rename(columns={"_id": "video_id"}, inplace=True) 

248 

249 # MindsDB GUI fails to display NaN values, so we replace them with 0 

250 df_videos.fillna(0, inplace=True) 

251 return df_videos 

252 

253 else: 

254 tables = ["args", "indexed_videos"] 

255 return pd.DataFrame(tables, columns=["tables"]) 

256 

257 def _get_api_client(self, args: Dict = None) -> TwelveLabsAPIClient: 

258 """ 

259 Returns a TwelveLabsAPIClient instance. 

260 

261 Parameters 

262 ---------- 

263 args : Dict 

264 Arguments from the USING clause. 

265 

266 Returns 

267 ------- 

268 TwelveLabsAPIClient 

269 TwelveLabsAPIClient instance. 

270 """ 

271 

272 if not args: 

273 args = self.model_storage.json_get('args') 

274 

275 # get api key 

276 api_key = get_api_key( 

277 api_name=self.name, 

278 create_args=args, 

279 engine_storage=self.engine_storage, 

280 ) 

281 

282 base_url = args.get('base_url', None) 

283 

284 # initialize TwelveLabsAPIClient 

285 return TwelveLabsAPIClient(api_key=api_key, base_url=base_url), api_key