Coverage for mindsdb / integrations / handlers / twilio_handler / twilio_handler.py: 0%
196 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
3import re
4from twilio.rest import Client
5import pandas as pd
6from mindsdb.integrations.libs.api_handler import APIHandler, APITable
7from mindsdb.integrations.libs.response import (
8 HandlerStatusResponse as StatusResponse,
9 HandlerResponse as Response,
10 RESPONSE_TYPE
11)
12from mindsdb.utilities.config import Config
13from mindsdb.utilities import log
14from mindsdb.integrations.utilities.date_utils import parse_local_date
15from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions, project_dataframe, filter_dataframe
17from mindsdb_sql_parser import ast
19logger = log.getLogger(__name__)
22class PhoneNumbersTable(APITable):
24 def select(self, query: ast.Select) -> Response:
26 conditions = extract_comparison_conditions(query.where)
28 params = {}
29 filters = []
30 for op, arg1, arg2 in conditions:
32 if op == 'or':
33 raise NotImplementedError('OR is not supported')
34 else:
35 filters.append([op, arg1, arg2])
37 if query.limit is not None:
38 params['limit'] = query.limit.value
40 result = self.handler.list_phone_numbers(params, df=True)
42 # filter targets
43 result = filter_dataframe(result, filters)
45 # project targets
46 result = project_dataframe(result, query.targets, self.get_columns())
48 return result
50 def get_columns(self):
51 return [
52 'sid',
53 'date_created',
54 'date_updated',
55 'phone_number',
56 'friendly_name',
57 'account_sid',
58 'capabilities',
59 'number_status',
60 'api_version',
61 'voice_url',
62 'sms_url',
63 'uri'
64 ]
67class MessagesTable(APITable):
69 def select(self, query: ast.Select) -> Response:
71 conditions = extract_comparison_conditions(query.where)
73 params = {}
74 filters = []
75 for op, arg1, arg2 in conditions:
77 if op == 'or':
78 raise NotImplementedError('OR is not supported')
79 if arg1 == 'sent_at' and arg2 is not None:
81 date = parse_local_date(arg2)
83 if op == '>':
84 params['date_sent_after'] = date
85 elif op == '<':
86 params['date_sent_before'] = date
87 else:
88 raise NotImplementedError
90 # also add to post query filter because date_sent_after=date1 will include date1
91 filters.append([op, arg1, arg2])
93 elif arg1 == 'sid':
94 if op == '=':
95 params['sid'] = arg2
96 # TODO: implement IN
97 else:
98 NotImplementedError('Only "from_number=" is implemented')
99 elif arg1 == 'from_number':
100 if op == '=':
101 params['from_number'] = arg2
102 # TODO: implement IN
103 else:
104 NotImplementedError('Only "from_number=" is implemented')
106 elif arg1 == 'to_number':
107 if op == '=':
108 params['to_number'] = arg2
109 # TODO: implement IN
110 else:
111 NotImplementedError('Only "to_number=" is implemented')
113 else:
114 filters.append([op, arg1, arg2])
116 result = self.handler.fetch_messages(params, df=True)
118 # filter targets
119 result = filter_dataframe(result, filters)
121 if query.limit is not None:
122 result = result[:int(query.limit.value)]
124 # project targets
125 result = project_dataframe(result, query.targets, self.get_columns())
127 return result
129 def get_columns(self):
130 return [
131 'sid',
132 'from_number',
133 'to_number',
134 'body',
135 'direction',
136 'msg_status',
137 'sent_at', # datetime.strptime(str(msg.date_sent), '%Y-%m-%d %H:%M:%S%z'),
138 'account_sid',
139 'price',
140 'price_unit',
141 'api_version',
142 'uri'
143 ]
145 def insert(self, query: ast.Insert):
146 # https://docs.tweepy.org/en/stable/client.html#tweepy.Client.create_tweet
147 columns = [col.name for col in query.columns]
149 ret = []
151 insert_params = ["to_number", "from_number", "body", 'media_url']
152 for row in query.values:
153 params = dict(zip(columns, row))
155 # split long text over 1500 symbols
156 max_text_len = 1500
157 text = params['body']
158 words = re.split('( )', text)
159 messages = []
161 text2 = ''
162 pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
163 for word in words:
164 # replace the links in word to string with the length as twitter short url (23)
165 word2 = re.sub(pattern, '-' * 23, word)
166 if len(text2) + len(word2) > max_text_len - 3 - 7: # 3 is for ..., 7 is for (10/11)
167 messages.append(text2.strip())
169 text2 = ''
170 text2 += word
172 # the last message
173 if text2.strip() != '':
174 messages.append(text2.strip())
176 len_messages = len(messages)
178 for i, text in enumerate(messages):
179 if i < len_messages - 1:
180 text += '...'
181 else:
182 text += ' '
184 if i >= 1:
185 text += f'({i + 1}/{len_messages})'
186 # only send image on first url
187 if 'media_url' in params:
188 del params['media_url']
190 params['body'] = text
191 params_to_send = {key: params[key] for key in insert_params if (key in params)}
192 ret_row = self.handler.send_sms(params_to_send, ret_as_dict=True)
193 ret_row['body'] = text
194 ret.append(ret_row)
196 return pd.DataFrame(ret)
199class TwilioHandler(APIHandler):
201 def __init__(self, name=None, **kwargs):
202 super().__init__(name)
204 args = kwargs.get('connection_data', {})
206 self.connection_args = {}
207 handler_config = Config().get('twilio_handler', {})
208 for k in ['account_sid', 'auth_token']:
209 if k in args:
210 self.connection_args[k] = args[k]
211 elif f'TWILIO_{k.upper()}' in os.environ:
212 self.connection_args[k] = os.environ[f'TWILIO_{k.upper()}']
213 elif k in handler_config:
214 self.connection_args[k] = handler_config[k]
216 self.client = None
217 self.is_connected = False
219 messages = MessagesTable(self)
220 phone_numbers = PhoneNumbersTable(self)
221 self._register_table('messages', messages)
222 self._register_table('phone_numbers', phone_numbers)
224 def connect(self):
225 """Authenticate with the Twilio API using the account_sid and auth_token provided in the constructor."""
226 if self.is_connected is True:
227 return self.client
229 self.client = Client(
230 self.connection_args['account_sid'],
231 self.connection_args['auth_token']
232 )
234 self.is_connected = True
235 return self.client
237 def check_connection(self) -> StatusResponse:
238 '''It evaluates if the connection with Twilio API is alive and healthy.'''
239 response = StatusResponse(False)
241 try:
242 self.connect()
243 # Maybe make a harmless API request to verify connection, but be mindful of rate limits and costs
244 response.success = True
246 except Exception as e:
247 response.error_message = f'Error connecting to Twilio api: {e}. '
248 logger.error(response.error_message)
250 if response.success is False and self.is_connected is True:
251 self.is_connected = False
253 return response
255 def parse_native_query(self, query_string: str):
256 """Parses the native query string of format method(arg1=val1, arg2=val2, ...) and returns the method name and arguments."""
258 # Adjust regex to account for the possibility of no arguments inside the parenthesis
259 match = re.match(r'(\w+)\(([^)]*)\)', query_string)
260 if not match:
261 raise ValueError(f"Invalid query format: {query_string}")
263 method_name = match.group(1)
264 arg_string = match.group(2)
266 # Extract individual arguments
267 args = {}
268 if arg_string: # Check if there are any arguments
269 for arg in arg_string.split(','):
270 arg = arg.strip()
271 key, value = arg.split('=')
272 args[key.strip()] = value.strip()
274 return method_name, args
276 def native_query(self, query_string: str = None):
277 '''It parses any native statement string and acts upon it (for example, raw syntax commands).'''
279 method_name, params = self.parse_native_query(query_string)
280 if method_name == 'send_sms':
281 response = self.send_sms(params)
282 elif method_name == 'fetch_messages':
283 response = self.fetch_messages(params)
284 elif method_name == 'list_phone_numbers':
285 response = self.list_phone_numbers(params)
286 else:
287 raise ValueError(f"Method '{method_name}' not supported by TwilioHandler")
289 return response
291 def send_sms(self, params, ret_as_dict=False):
292 message = self.client.messages.create(
293 to=params.get("to_number"),
294 from_=params.get('from_number'),
295 body=params.get("body"),
296 media_url=params.get("media_url")
297 )
299 if ret_as_dict is True:
300 return {'sid': message.sid, 'status': message.status}
301 return Response(
302 RESPONSE_TYPE.MESSAGE,
303 sid=message.sid,
304 status=message.status
305 )
307 def fetch_messages(self, params, df=False):
308 limit = int(params.get('limit', 1000))
309 sid = params.get('sid', None)
310 # Convert date strings to datetime objects if provided
311 date_sent_after = params.get('date_sent_after', None)
312 date_sent_before = params.get('date_sent_before', None)
313 # Extract 'from_' and 'body' search criteria from params
314 from_number = params.get('from_number', None)
315 to_number = params.get('to_number', None)
316 args = {
317 'limit': limit,
318 'date_sent_after': date_sent_after,
319 'date_sent_before': date_sent_before,
320 'from_': from_number,
321 'to': to_number
322 }
324 args = {arg: val for arg, val in args.items() if val is not None}
325 if sid:
326 messages = [self.client.messages(sid).fetch()]
327 else:
328 messages = self.client.messages.list(**args)
330 # Extract all possible properties for each message
331 data = []
332 for msg in messages:
333 msg_data = {
334 'sid': msg.sid,
335 'to_number': msg.to,
336 'from_number': msg.from_,
337 'body': msg.body,
338 'direction': msg.direction,
339 'msg_status': msg.status,
340 'sent_at': msg.date_created.replace(tzinfo=None),
341 'account_sid': msg.account_sid,
342 'price': msg.price,
343 'price_unit': msg.price_unit,
344 'api_version': msg.api_version,
345 'uri': msg.uri,
346 # 'media_url': [media.uri for media in msg.media.list()]
347 # ... Add other properties as needed
348 }
349 data.append(msg_data)
351 if df is True:
352 return pd.DataFrame(data)
353 return Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(data))
355 def list_phone_numbers(self, params, df=False):
356 limit = int(params.get('limit', 100))
357 args = {
358 'limit': limit
359 }
360 args = {arg: val for arg, val in args.items() if val is not None}
361 phone_numbers = self.client.incoming_phone_numbers.list(**args)
363 # Extract properties for each phone number
364 data = []
365 for number in phone_numbers:
366 num_data = {
367 'sid': number.sid,
368 'date_created': number.date_created,
369 'date_updated': number.date_updated,
370 'phone_number': number.phone_number,
371 'friendly_name': number.friendly_name,
372 'account_sid': number.account_sid,
373 'capabilities': number.capabilities,
374 'number_status': number.status,
375 'api_version': number.api_version,
376 'voice_url': number.voice_url,
377 'sms_url': number.sms_url,
378 'uri': number.uri,
379 # ... Add other properties as needed
380 }
381 data.append(num_data)
383 if df is True:
384 return pd.DataFrame(data)
385 return Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(data))