Coverage for mindsdb / integrations / handlers / youtube_handler / youtube_tables.py: 0%
203 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import List
3from mindsdb.integrations.libs.api_handler import APITable
4from mindsdb.utilities import log
6from mindsdb_sql_parser import ast
7from mindsdb.integrations.utilities.handlers.query_utilities import (
8 SELECTQueryParser,
9 SELECTQueryExecutor,
10 INSERTQueryParser,
11)
13import pandas as pd
14import re
15from youtube_transcript_api import YouTubeTranscriptApi
16from youtube_transcript_api.formatters import JSONFormatter
18logger = log.getLogger(__name__)
21class YoutubeCommentsTable(APITable):
22 """Youtube List Comments by video id Table implementation"""
24 def select(self, query: ast.Select) -> pd.DataFrame:
25 """Pulls data from the youtube "commentThreads()" API endpoint
26 Parameters
27 ----------
28 query : ast.Select
29 Given SQL SELECT query
30 Returns
31 -------
32 pd.DataFrame
33 youtube "commentThreads()" matching the query
34 Raises
35 ------
36 ValueError
37 If the query contains an unsupported condition
38 """
39 select_statement_parser = SELECTQueryParser(query, "comments", self.get_columns())
41 (
42 selected_columns,
43 where_conditions,
44 order_by_conditions,
45 result_limit,
46 ) = select_statement_parser.parse_query()
48 channel_id, video_id = None, None
49 for a_where in where_conditions:
50 if a_where[1] == "video_id":
51 if a_where[0] != "=":
52 raise NotImplementedError("Only '=' operator is supported for video_id column.")
53 else:
54 video_id = a_where[2]
55 elif a_where[1] == "channel_id":
56 if a_where[0] != "=":
57 raise NotImplementedError("Only '=' operator is supported for channel_id column.")
58 else:
59 channel_id = a_where[2]
61 if not video_id and not channel_id:
62 raise ValueError("Either video_id or channel_id has to be present in where clause.")
64 comments_df = self.get_comments(video_id=video_id, channel_id=channel_id)
66 select_statement_executor = SELECTQueryExecutor(
67 comments_df,
68 selected_columns,
69 [
70 where_condition
71 for where_condition in where_conditions
72 if where_condition[1] not in ["video_id", "channel_id"]
73 ],
74 order_by_conditions,
75 result_limit if query.limit else None,
76 )
78 comments_df = select_statement_executor.execute_query()
80 return comments_df
82 def insert(self, query: ast.Insert) -> None:
83 """Inserts data into the YouTube POST /commentThreads API endpoint.
85 Parameters
86 ----------
87 query : ast.Insert
88 Given SQL INSERT query
90 Returns
91 -------
92 None
94 Raises
95 ------
96 ValueError
97 If the query contains an unsupported condition
98 """
100 insert_query_parser = INSERTQueryParser(query, self.get_columns())
102 values_to_insert = insert_query_parser.parse_query()
104 for value in values_to_insert:
105 if not value.get("comment_id"):
106 if not value.get("comment"):
107 raise ValueError("comment is mandatory for inserting a top-level comment.")
108 else:
109 self.insert_comment(video_id=value["video_id"], text=value["comment"])
111 else:
112 if not value.get("reply"):
113 raise ValueError("reply is mandatory for inserting a reply.")
114 else:
115 self.insert_comment(comment_id=value["comment_id"], text=value["reply"])
117 def insert_comment(self, text, video_id: str = None, comment_id: str = None):
118 # if comment_id is provided, define the request body for a reply and insert it
119 if comment_id:
120 request_body = {"snippet": {"parentId": comment_id, "textOriginal": text}}
122 self.handler.connect().comments().insert(part="snippet", body=request_body).execute()
124 # else if video_id is provided, define the request body for a top-level comment and insert it
125 elif video_id:
126 request_body = {"snippet": {"topLevelComment": {"snippet": {"videoId": video_id, "textOriginal": text}}}}
128 self.handler.connect().commentThreads().insert(part="snippet", body=request_body).execute()
130 def get_columns(self) -> List[str]:
131 """Gets all columns to be returned in pandas DataFrame responses
132 Returns
133 -------
134 List[str]
135 List of columns
136 """
137 return [
138 "comment_id",
139 "channel_id",
140 "video_id",
141 "user_id",
142 "display_name",
143 "comment",
144 "published_at",
145 "updated_at",
146 "reply_user_id",
147 "reply_author",
148 "reply",
149 ]
151 def get_comments(self, video_id: str, channel_id: str):
152 """Pulls all the records from the given youtube api end point and returns it select()
154 Returns
155 -------
156 pd.DataFrame of all the records of the "commentThreads()" API end point
157 """
159 if video_id and channel_id:
160 channel_id = None
162 resource = (
163 self.handler.connect()
164 .commentThreads()
165 .list(
166 part="snippet, replies",
167 videoId=video_id,
168 allThreadsRelatedToChannelId=channel_id,
169 textFormat="plainText",
170 )
171 )
173 data = []
174 while resource:
175 comments = resource.execute()
177 for comment in comments["items"]:
178 replies = []
179 if "replies" in comment:
180 for reply in comment["replies"]["comments"]:
181 replies.append(
182 {
183 "reply_author": reply["snippet"]["authorDisplayName"],
184 "user_id": reply["snippet"]["authorChannelId"]["value"],
185 "reply": reply["snippet"]["textOriginal"],
186 }
187 )
188 else:
189 replies.append(
190 {
191 "reply_author": None,
192 "user_id": None,
193 "reply": None,
194 }
195 )
197 data.append(
198 {
199 "channel_id": comment["snippet"]["channelId"],
200 "video_id": comment["snippet"]["videoId"],
201 "user_id": comment["snippet"]["topLevelComment"]["snippet"]["authorChannelId"]["value"],
202 "comment_id": comment["snippet"]["topLevelComment"]["id"],
203 "display_name": comment["snippet"]["topLevelComment"]["snippet"]["authorDisplayName"],
204 "comment": comment["snippet"]["topLevelComment"]["snippet"]["textDisplay"],
205 "published_at": comment["snippet"]["topLevelComment"]["snippet"]["publishedAt"],
206 "updated_at": comment["snippet"]["topLevelComment"]["snippet"]["updatedAt"],
207 "replies": replies,
208 }
209 )
211 if "nextPageToken" in comments:
212 resource = (
213 self.handler.connect()
214 .commentThreads()
215 .list(
216 part="snippet, replies",
217 videoId=video_id,
218 allThreadsRelatedToChannelId=channel_id,
219 textFormat="plainText",
220 pageToken=comments["nextPageToken"],
221 )
222 )
223 else:
224 break
226 youtube_comments_df = pd.json_normalize(
227 data,
228 "replies",
229 [
230 "comment_id",
231 "channel_id",
232 "video_id",
233 "user_id",
234 "display_name",
235 "comment",
236 "published_at",
237 "updated_at",
238 ],
239 record_prefix="replies.",
240 )
241 youtube_comments_df = youtube_comments_df.rename(
242 columns={
243 "replies.user_id": "reply_user_id",
244 "replies.reply_author": "reply_author",
245 "replies.reply": "reply",
246 }
247 )
249 # check if DataFrame is empty
250 if youtube_comments_df.empty:
251 return youtube_comments_df
252 else:
253 return youtube_comments_df[
254 [
255 "comment_id",
256 "channel_id",
257 "video_id",
258 "user_id",
259 "display_name",
260 "comment",
261 "published_at",
262 "updated_at",
263 "reply_user_id",
264 "reply_author",
265 "reply",
266 ]
267 ]
270class YoutubeChannelsTable(APITable):
271 """Youtube Channel Info by channel id Table implementation"""
273 def select(self, query: ast.Select) -> pd.DataFrame:
274 select_statement_parser = SELECTQueryParser(query, "channel", self.get_columns())
276 (
277 selected_columns,
278 where_conditions,
279 order_by_conditions,
280 result_limit,
281 ) = select_statement_parser.parse_query()
283 channel_id = None
284 for op, arg1, arg2 in where_conditions:
285 if arg1 == "channel_id":
286 if op == "=":
287 channel_id = arg2
288 break
289 else:
290 raise NotImplementedError("Only '=' operator is supported for channel_id column.")
292 if not channel_id:
293 raise NotImplementedError("channel_id has to be present in where clause.")
295 channel_df = self.get_channel_details(channel_id)
297 select_statement_executor = SELECTQueryExecutor(
298 channel_df,
299 selected_columns,
300 [where_condition for where_condition in where_conditions if where_condition[1] == "channel_id"],
301 order_by_conditions,
302 result_limit if query.limit else None,
303 )
305 channel_df = select_statement_executor.execute_query()
307 return channel_df
309 def get_channel_details(self, channel_id):
310 details = (
311 self.handler.connect().channels().list(part="statistics,snippet,contentDetails", id=channel_id).execute()
312 )
313 snippet = details["items"][0]["snippet"]
314 statistics = details["items"][0]["statistics"]
315 data = {
316 "country": snippet["country"],
317 "description": snippet["description"],
318 "creation_date": snippet["publishedAt"],
319 "title": snippet["title"],
320 "subscriber_count": statistics["subscriberCount"],
321 "video_count": statistics["videoCount"],
322 "view_count": statistics["viewCount"],
323 "channel_id": channel_id,
324 }
325 return pd.json_normalize(data)
327 def get_columns(self) -> List[str]:
328 return [
329 "country",
330 "description",
331 "creation_date",
332 "title",
333 "subscriber_count",
334 "video_count",
335 "view_count",
336 "channel_id",
337 ]
340class YoutubeVideosTable(APITable):
341 """Youtube Video info by video id Table implementation"""
343 def select(self, query: ast.Select) -> pd.DataFrame:
344 select_statement_parser = SELECTQueryParser(query, "video", self.get_columns())
346 (
347 selected_columns,
348 where_conditions,
349 order_by_conditions,
350 result_limit,
351 ) = select_statement_parser.parse_query()
353 video_id, channel_id, search_query = None, None, None
354 for op, arg1, arg2 in where_conditions:
355 if arg1 == "video_id":
356 if op == "=":
357 video_id = arg2
358 else:
359 raise NotImplementedError("Only '=' operator is supported for video_id column.")
361 elif arg1 == "channel_id":
362 if op == "=":
363 channel_id = arg2
364 else:
365 raise NotImplementedError("Only '=' operator is supported for channel_id column.")
367 elif arg1 == "query":
368 if op == "=":
369 search_query = arg2
370 else:
371 raise NotImplementedError("Only '=' operator is supported for query column.")
373 if not video_id and not channel_id and not search_query:
374 raise ValueError("At least one of video_id, channel_id, or query must be present in the WHERE clause.")
376 if video_id:
377 video_df = self.get_videos_by_video_ids([video_id])
378 elif channel_id and search_query:
379 video_df = self.get_videos_by_search_query_in_channel(search_query, channel_id, result_limit)
380 elif channel_id:
381 video_df = self.get_videos_by_channel_id(channel_id, result_limit)
382 else:
383 video_df = self.get_videos_by_search_query(search_query, result_limit)
385 select_statement_executor = SELECTQueryExecutor(
386 video_df,
387 selected_columns,
388 [
389 where_condition
390 for where_condition in where_conditions
391 if where_condition[1] not in ["video_id", "channel_id", "query"]
392 ],
393 order_by_conditions,
394 result_limit if query.limit else None,
395 )
397 video_df = select_statement_executor.execute_query()
399 return video_df
401 def get_videos_by_search_query(self, search_query, limit=10):
402 video_ids = []
403 resource = (
404 self.handler.connect()
405 .search()
406 .list(part="snippet", q=search_query, type="video", maxResults=min(50, limit))
407 )
408 total_fetched = 0
410 while resource and total_fetched < limit:
411 response = resource.execute()
412 for item in response["items"]:
413 video_ids.append(item["id"]["videoId"])
414 total_fetched += 1
415 if total_fetched >= limit:
416 break
418 if "nextPageToken" in response and total_fetched < limit:
419 resource = (
420 self.handler.connect()
421 .search()
422 .list(
423 part="snippet",
424 q=search_query,
425 type="video",
426 maxResults=min(50, limit - total_fetched),
427 pageToken=response["nextPageToken"],
428 )
429 )
430 else:
431 break
433 return self.get_videos_by_video_ids(video_ids)
435 def get_videos_by_search_query_in_channel(self, search_query, channel_id, limit=10):
436 """Search for videos within a specific channel"""
437 video_ids = []
438 resource = (
439 self.handler.connect()
440 .search()
441 .list(part="snippet", q=search_query, channelId=channel_id, type="video", maxResults=min(50, limit))
442 )
443 total_fetched = 0
445 while resource and total_fetched < limit:
446 response = resource.execute()
447 for item in response["items"]:
448 video_ids.append(item["id"]["videoId"])
449 total_fetched += 1
450 if total_fetched >= limit:
451 break
453 if "nextPageToken" in response and total_fetched < limit:
454 resource = (
455 self.handler.connect()
456 .search()
457 .list(
458 part="snippet",
459 q=search_query,
460 channelId=channel_id,
461 type="video",
462 maxResults=min(50, limit - total_fetched),
463 pageToken=response["nextPageToken"],
464 )
465 )
466 else:
467 break
469 return self.get_videos_by_video_ids(video_ids)
471 def get_videos_by_channel_id(self, channel_id, limit=10):
472 video_ids = []
473 resource = (
474 self.handler.connect()
475 .search()
476 .list(part="snippet", channelId=channel_id, type="video", maxResults=min(50, limit))
477 )
478 total_fetched = 0
479 while resource and total_fetched < limit:
480 response = resource.execute()
481 for item in response["items"]:
482 video_ids.append(item["id"]["videoId"])
483 total_fetched += 1
484 if total_fetched >= limit:
485 break
486 if "nextPageToken" in response and total_fetched < limit:
487 resource = (
488 self.handler.connect()
489 .search()
490 .list(
491 part="snippet",
492 channelId=channel_id,
493 type="video",
494 maxResults=min(50, limit - total_fetched),
495 pageToken=response["nextPageToken"],
496 )
497 )
498 else:
499 break
501 return self.get_videos_by_video_ids(video_ids)
503 def get_videos_by_video_ids(self, video_ids):
504 data = []
506 if not isinstance(video_ids, list):
507 logger.error(f"video_ids must be a list. Received {type(video_ids)} instead.")
508 return pd.DataFrame()
510 # loop over 50 video ids at a time
511 # an invalid request error is caused otherwise
512 for i in range(0, len(video_ids), 50):
513 resource = (
514 self.handler.connect()
515 .videos()
516 .list(part="statistics,snippet,contentDetails", id=",".join(video_ids[i : i + 50]))
517 .execute()
518 )
520 for item in resource["items"]:
521 data.append(
522 {
523 "channel_id": item["snippet"]["channelId"],
524 "channel_title": item["snippet"]["channelTitle"],
525 "comment_count": item["statistics"]["commentCount"],
526 "description": item["snippet"]["description"],
527 "like_count": item["statistics"]["likeCount"],
528 "publish_time": item["snippet"]["publishedAt"],
529 "title": item["snippet"]["title"],
530 "transcript": self.get_captions_by_video_id(item["id"]),
531 "video_id": item["id"],
532 "view_count": item["statistics"]["viewCount"],
533 "duration_str": self.parse_duration(item["id"], item["contentDetails"]["duration"]),
534 }
535 )
537 return pd.json_normalize(data)
539 def get_captions_by_video_id(self, video_id):
540 try:
541 transcript_response = YouTubeTranscriptApi.get_transcript(video_id, preserve_formatting=True)
542 json_formatted_transcript = JSONFormatter().format_transcript(transcript_response, indent=2)
543 return json_formatted_transcript
545 except Exception as e:
546 (logger.error(f"Encountered an error while fetching transcripts for video ${video_id}: ${e}"),)
547 return "Transcript not available for this video"
549 def parse_duration(self, video_id, duration):
550 try:
551 parsed_duration = re.search(r"PT(\d+H)?(\d+M)?(\d+S)", duration).groups()
552 duration_str = ""
553 for d in parsed_duration:
554 if d:
555 duration_str += f"{d[:-1]}:"
557 return duration_str.strip(":")
558 except Exception as e:
559 (logger.error(f"Encountered an error while parsing duration for video ${video_id}: ${e}"),)
560 return "Duration not available for this video"
562 def get_columns(self) -> List[str]:
563 return [
564 "channel_id",
565 "channel_title",
566 "title",
567 "description",
568 "publish_time",
569 "comment_count",
570 "like_count",
571 "view_count",
572 "video_id",
573 "duration_str",
574 "transcript",
575 ]