Coverage for mindsdb / integrations / handlers / youtube_handler / youtube_tables.py: 0%

203 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import List 

2 

3from mindsdb.integrations.libs.api_handler import APITable 

4from mindsdb.utilities import log 

5 

6from mindsdb_sql_parser import ast 

7from mindsdb.integrations.utilities.handlers.query_utilities import ( 

8 SELECTQueryParser, 

9 SELECTQueryExecutor, 

10 INSERTQueryParser, 

11) 

12 

13import pandas as pd 

14import re 

15from youtube_transcript_api import YouTubeTranscriptApi 

16from youtube_transcript_api.formatters import JSONFormatter 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class YoutubeCommentsTable(APITable): 

22 """Youtube List Comments by video id Table implementation""" 

23 

24 def select(self, query: ast.Select) -> pd.DataFrame: 

25 """Pulls data from the youtube "commentThreads()" API endpoint 

26 Parameters 

27 ---------- 

28 query : ast.Select 

29 Given SQL SELECT query 

30 Returns 

31 ------- 

32 pd.DataFrame 

33 youtube "commentThreads()" matching the query 

34 Raises 

35 ------ 

36 ValueError 

37 If the query contains an unsupported condition 

38 """ 

39 select_statement_parser = SELECTQueryParser(query, "comments", self.get_columns()) 

40 

41 ( 

42 selected_columns, 

43 where_conditions, 

44 order_by_conditions, 

45 result_limit, 

46 ) = select_statement_parser.parse_query() 

47 

48 channel_id, video_id = None, None 

49 for a_where in where_conditions: 

50 if a_where[1] == "video_id": 

51 if a_where[0] != "=": 

52 raise NotImplementedError("Only '=' operator is supported for video_id column.") 

53 else: 

54 video_id = a_where[2] 

55 elif a_where[1] == "channel_id": 

56 if a_where[0] != "=": 

57 raise NotImplementedError("Only '=' operator is supported for channel_id column.") 

58 else: 

59 channel_id = a_where[2] 

60 

61 if not video_id and not channel_id: 

62 raise ValueError("Either video_id or channel_id has to be present in where clause.") 

63 

64 comments_df = self.get_comments(video_id=video_id, channel_id=channel_id) 

65 

66 select_statement_executor = SELECTQueryExecutor( 

67 comments_df, 

68 selected_columns, 

69 [ 

70 where_condition 

71 for where_condition in where_conditions 

72 if where_condition[1] not in ["video_id", "channel_id"] 

73 ], 

74 order_by_conditions, 

75 result_limit if query.limit else None, 

76 ) 

77 

78 comments_df = select_statement_executor.execute_query() 

79 

80 return comments_df 

81 

82 def insert(self, query: ast.Insert) -> None: 

83 """Inserts data into the YouTube POST /commentThreads API endpoint. 

84 

85 Parameters 

86 ---------- 

87 query : ast.Insert 

88 Given SQL INSERT query 

89 

90 Returns 

91 ------- 

92 None 

93 

94 Raises 

95 ------ 

96 ValueError 

97 If the query contains an unsupported condition 

98 """ 

99 

100 insert_query_parser = INSERTQueryParser(query, self.get_columns()) 

101 

102 values_to_insert = insert_query_parser.parse_query() 

103 

104 for value in values_to_insert: 

105 if not value.get("comment_id"): 

106 if not value.get("comment"): 

107 raise ValueError("comment is mandatory for inserting a top-level comment.") 

108 else: 

109 self.insert_comment(video_id=value["video_id"], text=value["comment"]) 

110 

111 else: 

112 if not value.get("reply"): 

113 raise ValueError("reply is mandatory for inserting a reply.") 

114 else: 

115 self.insert_comment(comment_id=value["comment_id"], text=value["reply"]) 

116 

117 def insert_comment(self, text, video_id: str = None, comment_id: str = None): 

118 # if comment_id is provided, define the request body for a reply and insert it 

119 if comment_id: 

120 request_body = {"snippet": {"parentId": comment_id, "textOriginal": text}} 

121 

122 self.handler.connect().comments().insert(part="snippet", body=request_body).execute() 

123 

124 # else if video_id is provided, define the request body for a top-level comment and insert it 

125 elif video_id: 

126 request_body = {"snippet": {"topLevelComment": {"snippet": {"videoId": video_id, "textOriginal": text}}}} 

127 

128 self.handler.connect().commentThreads().insert(part="snippet", body=request_body).execute() 

129 

130 def get_columns(self) -> List[str]: 

131 """Gets all columns to be returned in pandas DataFrame responses 

132 Returns 

133 ------- 

134 List[str] 

135 List of columns 

136 """ 

137 return [ 

138 "comment_id", 

139 "channel_id", 

140 "video_id", 

141 "user_id", 

142 "display_name", 

143 "comment", 

144 "published_at", 

145 "updated_at", 

146 "reply_user_id", 

147 "reply_author", 

148 "reply", 

149 ] 

150 

151 def get_comments(self, video_id: str, channel_id: str): 

152 """Pulls all the records from the given youtube api end point and returns it select() 

153 

154 Returns 

155 ------- 

156 pd.DataFrame of all the records of the "commentThreads()" API end point 

157 """ 

158 

159 if video_id and channel_id: 

160 channel_id = None 

161 

162 resource = ( 

163 self.handler.connect() 

164 .commentThreads() 

165 .list( 

166 part="snippet, replies", 

167 videoId=video_id, 

168 allThreadsRelatedToChannelId=channel_id, 

169 textFormat="plainText", 

170 ) 

171 ) 

172 

173 data = [] 

174 while resource: 

175 comments = resource.execute() 

176 

177 for comment in comments["items"]: 

178 replies = [] 

179 if "replies" in comment: 

180 for reply in comment["replies"]["comments"]: 

181 replies.append( 

182 { 

183 "reply_author": reply["snippet"]["authorDisplayName"], 

184 "user_id": reply["snippet"]["authorChannelId"]["value"], 

185 "reply": reply["snippet"]["textOriginal"], 

186 } 

187 ) 

188 else: 

189 replies.append( 

190 { 

191 "reply_author": None, 

192 "user_id": None, 

193 "reply": None, 

194 } 

195 ) 

196 

197 data.append( 

198 { 

199 "channel_id": comment["snippet"]["channelId"], 

200 "video_id": comment["snippet"]["videoId"], 

201 "user_id": comment["snippet"]["topLevelComment"]["snippet"]["authorChannelId"]["value"], 

202 "comment_id": comment["snippet"]["topLevelComment"]["id"], 

203 "display_name": comment["snippet"]["topLevelComment"]["snippet"]["authorDisplayName"], 

204 "comment": comment["snippet"]["topLevelComment"]["snippet"]["textDisplay"], 

205 "published_at": comment["snippet"]["topLevelComment"]["snippet"]["publishedAt"], 

206 "updated_at": comment["snippet"]["topLevelComment"]["snippet"]["updatedAt"], 

207 "replies": replies, 

208 } 

209 ) 

210 

211 if "nextPageToken" in comments: 

212 resource = ( 

213 self.handler.connect() 

214 .commentThreads() 

215 .list( 

216 part="snippet, replies", 

217 videoId=video_id, 

218 allThreadsRelatedToChannelId=channel_id, 

219 textFormat="plainText", 

220 pageToken=comments["nextPageToken"], 

221 ) 

222 ) 

223 else: 

224 break 

225 

226 youtube_comments_df = pd.json_normalize( 

227 data, 

228 "replies", 

229 [ 

230 "comment_id", 

231 "channel_id", 

232 "video_id", 

233 "user_id", 

234 "display_name", 

235 "comment", 

236 "published_at", 

237 "updated_at", 

238 ], 

239 record_prefix="replies.", 

240 ) 

241 youtube_comments_df = youtube_comments_df.rename( 

242 columns={ 

243 "replies.user_id": "reply_user_id", 

244 "replies.reply_author": "reply_author", 

245 "replies.reply": "reply", 

246 } 

247 ) 

248 

249 # check if DataFrame is empty 

250 if youtube_comments_df.empty: 

251 return youtube_comments_df 

252 else: 

253 return youtube_comments_df[ 

254 [ 

255 "comment_id", 

256 "channel_id", 

257 "video_id", 

258 "user_id", 

259 "display_name", 

260 "comment", 

261 "published_at", 

262 "updated_at", 

263 "reply_user_id", 

264 "reply_author", 

265 "reply", 

266 ] 

267 ] 

268 

269 

270class YoutubeChannelsTable(APITable): 

271 """Youtube Channel Info by channel id Table implementation""" 

272 

273 def select(self, query: ast.Select) -> pd.DataFrame: 

274 select_statement_parser = SELECTQueryParser(query, "channel", self.get_columns()) 

275 

276 ( 

277 selected_columns, 

278 where_conditions, 

279 order_by_conditions, 

280 result_limit, 

281 ) = select_statement_parser.parse_query() 

282 

283 channel_id = None 

284 for op, arg1, arg2 in where_conditions: 

285 if arg1 == "channel_id": 

286 if op == "=": 

287 channel_id = arg2 

288 break 

289 else: 

290 raise NotImplementedError("Only '=' operator is supported for channel_id column.") 

291 

292 if not channel_id: 

293 raise NotImplementedError("channel_id has to be present in where clause.") 

294 

295 channel_df = self.get_channel_details(channel_id) 

296 

297 select_statement_executor = SELECTQueryExecutor( 

298 channel_df, 

299 selected_columns, 

300 [where_condition for where_condition in where_conditions if where_condition[1] == "channel_id"], 

301 order_by_conditions, 

302 result_limit if query.limit else None, 

303 ) 

304 

305 channel_df = select_statement_executor.execute_query() 

306 

307 return channel_df 

308 

309 def get_channel_details(self, channel_id): 

310 details = ( 

311 self.handler.connect().channels().list(part="statistics,snippet,contentDetails", id=channel_id).execute() 

312 ) 

313 snippet = details["items"][0]["snippet"] 

314 statistics = details["items"][0]["statistics"] 

315 data = { 

316 "country": snippet["country"], 

317 "description": snippet["description"], 

318 "creation_date": snippet["publishedAt"], 

319 "title": snippet["title"], 

320 "subscriber_count": statistics["subscriberCount"], 

321 "video_count": statistics["videoCount"], 

322 "view_count": statistics["viewCount"], 

323 "channel_id": channel_id, 

324 } 

325 return pd.json_normalize(data) 

326 

327 def get_columns(self) -> List[str]: 

328 return [ 

329 "country", 

330 "description", 

331 "creation_date", 

332 "title", 

333 "subscriber_count", 

334 "video_count", 

335 "view_count", 

336 "channel_id", 

337 ] 

338 

339 

340class YoutubeVideosTable(APITable): 

341 """Youtube Video info by video id Table implementation""" 

342 

343 def select(self, query: ast.Select) -> pd.DataFrame: 

344 select_statement_parser = SELECTQueryParser(query, "video", self.get_columns()) 

345 

346 ( 

347 selected_columns, 

348 where_conditions, 

349 order_by_conditions, 

350 result_limit, 

351 ) = select_statement_parser.parse_query() 

352 

353 video_id, channel_id, search_query = None, None, None 

354 for op, arg1, arg2 in where_conditions: 

355 if arg1 == "video_id": 

356 if op == "=": 

357 video_id = arg2 

358 else: 

359 raise NotImplementedError("Only '=' operator is supported for video_id column.") 

360 

361 elif arg1 == "channel_id": 

362 if op == "=": 

363 channel_id = arg2 

364 else: 

365 raise NotImplementedError("Only '=' operator is supported for channel_id column.") 

366 

367 elif arg1 == "query": 

368 if op == "=": 

369 search_query = arg2 

370 else: 

371 raise NotImplementedError("Only '=' operator is supported for query column.") 

372 

373 if not video_id and not channel_id and not search_query: 

374 raise ValueError("At least one of video_id, channel_id, or query must be present in the WHERE clause.") 

375 

376 if video_id: 

377 video_df = self.get_videos_by_video_ids([video_id]) 

378 elif channel_id and search_query: 

379 video_df = self.get_videos_by_search_query_in_channel(search_query, channel_id, result_limit) 

380 elif channel_id: 

381 video_df = self.get_videos_by_channel_id(channel_id, result_limit) 

382 else: 

383 video_df = self.get_videos_by_search_query(search_query, result_limit) 

384 

385 select_statement_executor = SELECTQueryExecutor( 

386 video_df, 

387 selected_columns, 

388 [ 

389 where_condition 

390 for where_condition in where_conditions 

391 if where_condition[1] not in ["video_id", "channel_id", "query"] 

392 ], 

393 order_by_conditions, 

394 result_limit if query.limit else None, 

395 ) 

396 

397 video_df = select_statement_executor.execute_query() 

398 

399 return video_df 

400 

401 def get_videos_by_search_query(self, search_query, limit=10): 

402 video_ids = [] 

403 resource = ( 

404 self.handler.connect() 

405 .search() 

406 .list(part="snippet", q=search_query, type="video", maxResults=min(50, limit)) 

407 ) 

408 total_fetched = 0 

409 

410 while resource and total_fetched < limit: 

411 response = resource.execute() 

412 for item in response["items"]: 

413 video_ids.append(item["id"]["videoId"]) 

414 total_fetched += 1 

415 if total_fetched >= limit: 

416 break 

417 

418 if "nextPageToken" in response and total_fetched < limit: 

419 resource = ( 

420 self.handler.connect() 

421 .search() 

422 .list( 

423 part="snippet", 

424 q=search_query, 

425 type="video", 

426 maxResults=min(50, limit - total_fetched), 

427 pageToken=response["nextPageToken"], 

428 ) 

429 ) 

430 else: 

431 break 

432 

433 return self.get_videos_by_video_ids(video_ids) 

434 

435 def get_videos_by_search_query_in_channel(self, search_query, channel_id, limit=10): 

436 """Search for videos within a specific channel""" 

437 video_ids = [] 

438 resource = ( 

439 self.handler.connect() 

440 .search() 

441 .list(part="snippet", q=search_query, channelId=channel_id, type="video", maxResults=min(50, limit)) 

442 ) 

443 total_fetched = 0 

444 

445 while resource and total_fetched < limit: 

446 response = resource.execute() 

447 for item in response["items"]: 

448 video_ids.append(item["id"]["videoId"]) 

449 total_fetched += 1 

450 if total_fetched >= limit: 

451 break 

452 

453 if "nextPageToken" in response and total_fetched < limit: 

454 resource = ( 

455 self.handler.connect() 

456 .search() 

457 .list( 

458 part="snippet", 

459 q=search_query, 

460 channelId=channel_id, 

461 type="video", 

462 maxResults=min(50, limit - total_fetched), 

463 pageToken=response["nextPageToken"], 

464 ) 

465 ) 

466 else: 

467 break 

468 

469 return self.get_videos_by_video_ids(video_ids) 

470 

471 def get_videos_by_channel_id(self, channel_id, limit=10): 

472 video_ids = [] 

473 resource = ( 

474 self.handler.connect() 

475 .search() 

476 .list(part="snippet", channelId=channel_id, type="video", maxResults=min(50, limit)) 

477 ) 

478 total_fetched = 0 

479 while resource and total_fetched < limit: 

480 response = resource.execute() 

481 for item in response["items"]: 

482 video_ids.append(item["id"]["videoId"]) 

483 total_fetched += 1 

484 if total_fetched >= limit: 

485 break 

486 if "nextPageToken" in response and total_fetched < limit: 

487 resource = ( 

488 self.handler.connect() 

489 .search() 

490 .list( 

491 part="snippet", 

492 channelId=channel_id, 

493 type="video", 

494 maxResults=min(50, limit - total_fetched), 

495 pageToken=response["nextPageToken"], 

496 ) 

497 ) 

498 else: 

499 break 

500 

501 return self.get_videos_by_video_ids(video_ids) 

502 

503 def get_videos_by_video_ids(self, video_ids): 

504 data = [] 

505 

506 if not isinstance(video_ids, list): 

507 logger.error(f"video_ids must be a list. Received {type(video_ids)} instead.") 

508 return pd.DataFrame() 

509 

510 # loop over 50 video ids at a time 

511 # an invalid request error is caused otherwise 

512 for i in range(0, len(video_ids), 50): 

513 resource = ( 

514 self.handler.connect() 

515 .videos() 

516 .list(part="statistics,snippet,contentDetails", id=",".join(video_ids[i : i + 50])) 

517 .execute() 

518 ) 

519 

520 for item in resource["items"]: 

521 data.append( 

522 { 

523 "channel_id": item["snippet"]["channelId"], 

524 "channel_title": item["snippet"]["channelTitle"], 

525 "comment_count": item["statistics"]["commentCount"], 

526 "description": item["snippet"]["description"], 

527 "like_count": item["statistics"]["likeCount"], 

528 "publish_time": item["snippet"]["publishedAt"], 

529 "title": item["snippet"]["title"], 

530 "transcript": self.get_captions_by_video_id(item["id"]), 

531 "video_id": item["id"], 

532 "view_count": item["statistics"]["viewCount"], 

533 "duration_str": self.parse_duration(item["id"], item["contentDetails"]["duration"]), 

534 } 

535 ) 

536 

537 return pd.json_normalize(data) 

538 

539 def get_captions_by_video_id(self, video_id): 

540 try: 

541 transcript_response = YouTubeTranscriptApi.get_transcript(video_id, preserve_formatting=True) 

542 json_formatted_transcript = JSONFormatter().format_transcript(transcript_response, indent=2) 

543 return json_formatted_transcript 

544 

545 except Exception as e: 

546 (logger.error(f"Encountered an error while fetching transcripts for video ${video_id}: ${e}"),) 

547 return "Transcript not available for this video" 

548 

549 def parse_duration(self, video_id, duration): 

550 try: 

551 parsed_duration = re.search(r"PT(\d+H)?(\d+M)?(\d+S)", duration).groups() 

552 duration_str = "" 

553 for d in parsed_duration: 

554 if d: 

555 duration_str += f"{d[:-1]}:" 

556 

557 return duration_str.strip(":") 

558 except Exception as e: 

559 (logger.error(f"Encountered an error while parsing duration for video ${video_id}: ${e}"),) 

560 return "Duration not available for this video" 

561 

562 def get_columns(self) -> List[str]: 

563 return [ 

564 "channel_id", 

565 "channel_title", 

566 "title", 

567 "description", 

568 "publish_time", 

569 "comment_count", 

570 "like_count", 

571 "view_count", 

572 "video_id", 

573 "duration_str", 

574 "transcript", 

575 ]