Coverage for mindsdb / integrations / handlers / whatsapp_handler / whatsapp_handler.py: 0%
187 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2from twilio.rest import Client
3import re
4from datetime import datetime as datetime
5from typing import List
6import pandas as pd
8from mindsdb.utilities import log
9from mindsdb.utilities.config import Config
11from mindsdb_sql_parser import ast
12from mindsdb.integrations.utilities.date_utils import parse_local_date
14from mindsdb.integrations.libs.api_handler import APIHandler, APITable
16from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions, project_dataframe, filter_dataframe
18from mindsdb.integrations.libs.response import (
19 HandlerStatusResponse as StatusResponse,
20 HandlerResponse as Response,
21 RESPONSE_TYPE
22)
24logger = log.getLogger(__name__)
27class WhatsAppMessagesTable(APITable):
28 def select(self, query: ast.Select) -> Response:
29 """
30 Retrieves messages sent/received from the database using Twilio Whatsapp API
31 Returns
32 Response: conversation_history
33 """
35 # Extract comparison conditions from the query
36 conditions = extract_comparison_conditions(query.where)
37 params = {}
38 filters = []
40 # Build the filters and parameters for the query
41 for op, arg1, arg2 in conditions:
42 if op == 'or':
43 raise NotImplementedError('OR is not supported')
45 if arg1 == 'sent_at' and arg2 is not None:
46 date = parse_local_date(arg2)
47 if op == '>':
48 params['date_sent_after'] = date
49 elif op == '<':
50 params['date_sent_before'] = date
51 else:
52 raise NotImplementedError
54 # also add to post query filter because date_sent_after=date1 will include date1
55 filters.append([op, arg1, arg2])
57 elif arg1 == 'sid':
58 if op == '=':
59 params['sid'] = arg2
60 else:
61 NotImplementedError('Only "from_number=" is implemented')
63 elif arg1 == 'from_number':
64 if op == '=':
65 params['from_number'] = arg2
66 else:
67 NotImplementedError('Only "from_number=" is implemented')
69 elif arg1 == 'to_number':
70 if op == '=':
71 params['to_number'] = arg2
72 else:
73 NotImplementedError('Only "to_number=" is implemented')
75 else:
76 filters.append([op, arg1, arg2])
78 # Fetch messages based on the filters
79 result = self.handler.fetch_messages(params, df=True)
81 # filter targets
82 result = filter_dataframe(result, filters)
84 # If limit is specified
85 if query.limit is not None:
86 result = result[:int(query.limit.value)]
88 # project targets
89 result = project_dataframe(result, query.targets, self.get_columns())
91 return result
93 def get_columns(self):
94 return [
95 'sid',
96 'from_number',
97 'to_number',
98 'body',
99 'direction',
100 'msg_status',
101 'sent_at', # datetime.strptime(str(msg.date_sent), '%Y-%m-%d %H:%M:%S%z'),
102 'account_sid',
103 'price',
104 'price_unit',
105 'api_version',
106 'uri'
107 ]
109 def insert(self, query: ast.Insert):
110 """
111 Sends a whatsapp message
113 Args:
114 body: message body
115 from_number: number from which to send the message
116 to_number: number to which message will be sent
117 """
119 # get column names and values from the query
120 columns = [col.name for col in query.columns]
122 ret = []
124 insert_params = ["body", "from_number", "to_number"]
125 for row in query.values:
126 params = dict(zip(columns, row))
128 # Check text length
129 max_text_len = 1500
130 text = params["body"]
131 words = re.split('( )', text)
132 messages = []
134 """
135 Regex for matching if any URls are present, if yes then replace with string of hyphens(-)
137 Example:
138 words = ['Check', ' ', 'out', ' ', 'this', ' ', 'cool', ' ', 'website:', ' ', 'https://example.com.', "It's", ' ', 'awesome!']
140 After parsing through regex ('https://example.com') URL is matched
142 Final output:
143 messages = ['Check - out - this - cool - website: ----------------------- "It\'s - awesome!']
144 """
146 text2 = ''
147 pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
148 for word in words:
149 # replace the links in word to string with the length as twitter short url (23)
150 word2 = re.sub(pattern, '-' * 23, word)
151 if len(text2) + len(word2) > max_text_len - 3 - 7: # 3 is for ..., 7 is for (10/11)
152 messages.append(text2.strip())
154 text2 = ''
155 text2 += word
157 # Parse last message
158 if text2.strip() != '':
159 messages.append(text2.strip())
161 len_messages = len(messages)
163 # Modify message based on the length
164 for i, text in enumerate(messages):
165 if i < len_messages - 1:
166 text += '...'
167 else:
168 text += ' '
170 if i >= 1:
171 text += f'({i + 1}/{len_messages})'
173 # Pass parameters and call 'send_message'
174 params['body'] = text
175 params_to_send = {key: params[key] for key in insert_params if (key in params)}
176 ret_row = self.handler.send_message(params_to_send, ret_as_dict=True)
178 # Save the results
179 ret_row['body'] = text
180 ret.append(ret_row)
182 return pd.DataFrame(ret)
185class WhatsAppHandler(APIHandler):
186 """
187 A class for handling connections and interactions with Twilio WhatsApp API.
188 Args:
189 account_sid(str): Accound ID of the twilio account.
190 auth_token(str): Authentication Token obtained from the twilio account.
191 """
193 def __init__(self, name=None, **kwargs):
194 """
195 Initializes the connection by checking all the params are provided by the user.
196 """
197 super().__init__(name)
199 args = kwargs.get('connection_data', {})
200 self.connection_args = {}
201 handler_config = Config().get('whatsapp_handler', {})
202 for k in ['account_sid', 'auth_token']:
203 if k in args:
204 self.connection_args[k] = args[k]
205 elif f'TWILIO_{k.upper()}' in os.environ:
206 self.connection_args[k] = os.environ[f'TWILIO_{k.upper()}']
207 elif k in handler_config:
208 self.connection_args[k] = handler_config[k]
209 self.client = None
210 self.is_connected = False
212 messages = WhatsAppMessagesTable(self)
213 self._register_table('messages', messages)
215 def connect(self):
216 """
217 Authenticate with the Twilio API using the provided `account_SID` and `auth_token`.
218 """
219 if self.is_connected is True:
220 return self.client
222 self.client = Client(
223 self.connection_args['account_sid'],
224 self.connection_args['auth_token']
225 )
227 self.is_connected = True
228 return self.client
230 def check_connection(self) -> StatusResponse:
231 """
232 Checks the connection by performing a basic operation with the Twilio API.
233 """
234 response = StatusResponse(False)
236 try:
237 self.connect()
238 response.success = True
240 except Exception as e:
241 response.error_message = f'Error connecting to Twilio API: {str(e)}. Check credentials.'
242 logger.error(response.error_message)
244 if response.success is False and self.is_connected is True:
245 self.is_connected = False
247 return response
249 def parse_native_query(self, query_string: str):
250 """Parses the native query string of format method(arg1=val1, arg2=val2, ...) and returns the method name and arguments."""
252 # Adjust regex to account for the possibility of no arguments inside the parenthesis
253 match = re.match(r'(\w+)\(([^)]*)\)', query_string)
254 if not match:
255 raise ValueError(f"Invalid query format: {query_string}")
257 method_name = match.group(1)
258 arg_string = match.group(2)
260 # Extract individual arguments
261 args = {}
262 if arg_string: # Check if there are any arguments
263 for arg in arg_string.split(','):
264 arg = arg.strip()
265 key, value = arg.split('=')
266 args[key.strip()] = value.strip()
268 return method_name, args
270 def native_query(self, query_string: str = None):
271 """
272 Retreievs the native query from the `parse_native_query` and calls appropriate function and returns the result of the query as a Response object.
273 """
274 method_name, params = self.parse_native_query(query_string)
275 if method_name == 'send_message':
276 response = self.send_message(params)
277 else:
278 raise ValueError(f"Method '{method_name}' not supported by TwilioHandler")
280 return response
282 def fetch_messages(self, params, df=False):
283 """
284 Gets conversation history
286 Returns:
287 Response: conversation history
288 """
289 limit = int(params.get('limit', 1000))
290 sid = params.get('sid', None)
291 # Convert date strings to datetime objects if provided
292 date_sent_after = params.get('date_sent_after', None)
293 date_sent_before = params.get('date_sent_before', None)
294 # Extract 'from_' and 'body' search criteria from params
295 from_number = params.get('from_number', None)
296 to_number = params.get('to_number', None)
297 args = {
298 'limit': limit,
299 'date_sent_after': date_sent_after,
300 'date_sent_before': date_sent_before,
301 'from_': from_number,
302 'to': to_number
303 }
305 args = {arg: val for arg, val in args.items() if val is not None}
306 if sid:
307 messages = [self.client.messages(sid).fetch()]
308 else:
309 messages = self.client.messages.list(**args)
311 # Extract all possible properties for each message
312 data = []
313 for msg in messages:
314 msg_data = {
315 'sid': msg.sid,
316 'to_number': msg.to,
317 'from_number': msg.from_,
318 'body': msg.body,
319 'direction': msg.direction,
320 'msg_status': msg.status,
321 'sent_at': msg.date_created.replace(tzinfo=None),
322 'account_sid': msg.account_sid,
323 'price': msg.price,
324 'price_unit': msg.price_unit,
325 'api_version': msg.api_version,
326 'uri': msg.uri,
327 # 'media_url': [media.uri for media in msg.media.list()]
328 # ... Add other properties as needed
329 }
330 data.append(msg_data)
332 # Create a DataFrame
333 result_df = pd.DataFrame(data)
335 # Filter rows where 'from_number' or 'to_number' begins with 'whatsapp:'
336 result_df = result_df[result_df['from_number'].str.startswith('whatsapp:') | result_df['to_number'].str.startswith('whatsapp:')]
338 if df is True:
339 return pd.DataFrame(result_df)
340 return Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(result_df))
342 def send_message(self, params, ret_as_dict=False) -> Response:
343 """
344 Sends a message to the given Whatsapp number.
346 Args:
347 body: message body
348 from_number: number from which to send the message
349 to_number: number to which message will be sent
350 """
351 try:
352 message = self.client.messages.create(
353 body=params.get('body'),
354 to=params.get('to_number'),
355 from_=params.get('from_number')
356 )
358 if ret_as_dict is True:
359 return {"sid": message.sid, "from": message.from_, "to": message.to, "message": message.body, "status": message.status}
361 return Response(
362 RESPONSE_TYPE.MESSAGE,
363 sid=message.sid,
364 from_=message.from_,
365 to=message.to,
366 body=message.body,
367 status=message.status
368 )
370 except Exception as e:
371 # Log the exception for debugging purposes
372 logger.error(f"Error sending message: {str(e)}")
373 logger.exception(f"Error sending message: {str(e)}")
374 raise Exception("Error sending message")
376 def call_whatsapp_api(self, method_name: str = None, params: dict = None):
377 """
378 Calls specific method specified.
380 Args:
381 method_name: to call specific method
382 params: parameters to call the method
384 Returns:
385 List of dictionaries as a result of the method call
386 """
387 api = self.connect()
388 method = getattr(api, method_name)
390 try:
391 result = method(**params)
392 except Exception as e:
393 error = f"Error calling method '{method_name}' with params '{params}': {e}"
394 logger.error(error)
395 raise e
397 if 'messages' in result:
398 result['messages'] = self.convert_channel_data(result['messages'])
400 return [result]
402 def convert_channel_data(self, messages: List[dict]):
403 """
404 Convert the list of channel dictionaries to a format that can be easily used in the data pipeline.
406 Args:
407 channels: A list of channel dictionaries.
409 Returns:
410 A list of channel dictionaries with modified keys and values.
411 """
412 new_messages = []
413 for message in messages:
414 new_message = {
415 'id': message['id'],
416 'name': message['name'],
417 'created': datetime.fromtimestamp(float(message['created']))
418 }
419 new_messages.append(new_message)
420 return new_messages