Coverage for mindsdb / integrations / handlers / reddit_handler / reddit_handler.py: 0%
53 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import praw
2import os
3from mindsdb.integrations.libs.api_handler import APIHandler
4from mindsdb.integrations.libs.response import (
5 HandlerStatusResponse as StatusResponse,
6 HandlerResponse as Response,
7 RESPONSE_TYPE
8)
9from mindsdb.utilities.config import Config
10from mindsdb.utilities import log
12from .reddit_tables import CommentTable, SubmissionTable
14logger = log.getLogger(__name__)
17class RedditHandler(APIHandler):
19 def __init__(self, name=None, **kwargs):
20 super().__init__(name)
22 args = kwargs.get('connection_data', {})
24 self.connection_args = {}
25 handler_config = Config().get('reddit_handler', {})
26 for k in ['client_id', 'client_secret', 'user_agent']:
27 if k in args:
28 self.connection_args[k] = args[k]
29 elif f'REDDIT_{k.upper()}' in os.environ:
30 self.connection_args[k] = os.environ[f'REDDIT_{k.upper()}']
31 elif k in handler_config:
32 self.connection_args[k] = handler_config[k]
34 self.reddit = None
35 self.is_connected = False
37 comment = CommentTable(self)
38 self._register_table('comment', comment)
40 submission = SubmissionTable(self)
41 self._register_table('submission', submission)
43 def connect(self):
44 """Authenticate with the Reddit API using the client ID, client secret
45 and user agent provided in the constructor.
46 """
47 if self.is_connected is True:
48 return self.reddit
50 self.reddit = praw.Reddit(
51 client_id=self.connection_args['client_id'],
52 client_secret=self.connection_args['client_secret'],
53 user_agent=self.connection_args['user_agent'],
54 )
56 self.is_connected = True
57 return self.reddit
59 def check_connection(self) -> StatusResponse:
60 '''It evaluates if the connection with Reddit API is alive and healthy.
61 Returns:
62 HandlerStatusResponse
63 '''
65 response = StatusResponse(False)
67 try:
68 reddit = self.connect()
69 reddit.user.me()
70 response.success = True
72 except Exception as e:
73 response.error_message = f'Error connecting to Reddit api: {e}. '
74 logger.error(response.error_message)
76 if response.success is False and self.is_connected is True:
77 self.is_connected = False
79 return response
81 def native_query(self, query_string: str = None):
82 '''It parses any native statement string and acts upon it (for example, raw syntax commands).
83 Args:
84 query (Any): query in native format (str for sql databases,
85 dict for mongo, api's json etc)
86 Returns:
87 HandlerResponse
88 '''
90 method_name, params = self.parse_native_query(query_string)
91 if method_name == 'get_submission':
92 df = self.get_submission(params)
93 elif method_name == 'get_subreddit':
94 df = self.get_subreddit(params)
95 else:
96 raise ValueError(f"Method '{method_name}' not supported by RedditHandler")
98 return Response(
99 RESPONSE_TYPE.TABLE,
100 data_frame=df
101 )