Coverage for mindsdb / integrations / handlers / whatsapp_handler / whatsapp_handler.py: 0%

187 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2from twilio.rest import Client 

3import re 

4from datetime import datetime as datetime 

5from typing import List 

6import pandas as pd 

7 

8from mindsdb.utilities import log 

9from mindsdb.utilities.config import Config 

10 

11from mindsdb_sql_parser import ast 

12from mindsdb.integrations.utilities.date_utils import parse_local_date 

13 

14from mindsdb.integrations.libs.api_handler import APIHandler, APITable 

15 

16from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions, project_dataframe, filter_dataframe 

17 

18from mindsdb.integrations.libs.response import ( 

19 HandlerStatusResponse as StatusResponse, 

20 HandlerResponse as Response, 

21 RESPONSE_TYPE 

22) 

23 

24logger = log.getLogger(__name__) 

25 

26 

27class WhatsAppMessagesTable(APITable): 

28 def select(self, query: ast.Select) -> Response: 

29 """ 

30 Retrieves messages sent/received from the database using Twilio Whatsapp API 

31 Returns 

32 Response: conversation_history 

33 """ 

34 

35 # Extract comparison conditions from the query 

36 conditions = extract_comparison_conditions(query.where) 

37 params = {} 

38 filters = [] 

39 

40 # Build the filters and parameters for the query 

41 for op, arg1, arg2 in conditions: 

42 if op == 'or': 

43 raise NotImplementedError('OR is not supported') 

44 

45 if arg1 == 'sent_at' and arg2 is not None: 

46 date = parse_local_date(arg2) 

47 if op == '>': 

48 params['date_sent_after'] = date 

49 elif op == '<': 

50 params['date_sent_before'] = date 

51 else: 

52 raise NotImplementedError 

53 

54 # also add to post query filter because date_sent_after=date1 will include date1 

55 filters.append([op, arg1, arg2]) 

56 

57 elif arg1 == 'sid': 

58 if op == '=': 

59 params['sid'] = arg2 

60 else: 

61 NotImplementedError('Only "from_number=" is implemented') 

62 

63 elif arg1 == 'from_number': 

64 if op == '=': 

65 params['from_number'] = arg2 

66 else: 

67 NotImplementedError('Only "from_number=" is implemented') 

68 

69 elif arg1 == 'to_number': 

70 if op == '=': 

71 params['to_number'] = arg2 

72 else: 

73 NotImplementedError('Only "to_number=" is implemented') 

74 

75 else: 

76 filters.append([op, arg1, arg2]) 

77 

78 # Fetch messages based on the filters 

79 result = self.handler.fetch_messages(params, df=True) 

80 

81 # filter targets 

82 result = filter_dataframe(result, filters) 

83 

84 # If limit is specified 

85 if query.limit is not None: 

86 result = result[:int(query.limit.value)] 

87 

88 # project targets 

89 result = project_dataframe(result, query.targets, self.get_columns()) 

90 

91 return result 

92 

93 def get_columns(self): 

94 return [ 

95 'sid', 

96 'from_number', 

97 'to_number', 

98 'body', 

99 'direction', 

100 'msg_status', 

101 'sent_at', # datetime.strptime(str(msg.date_sent), '%Y-%m-%d %H:%M:%S%z'), 

102 'account_sid', 

103 'price', 

104 'price_unit', 

105 'api_version', 

106 'uri' 

107 ] 

108 

109 def insert(self, query: ast.Insert): 

110 """ 

111 Sends a whatsapp message 

112 

113 Args: 

114 body: message body 

115 from_number: number from which to send the message 

116 to_number: number to which message will be sent 

117 """ 

118 

119 # get column names and values from the query 

120 columns = [col.name for col in query.columns] 

121 

122 ret = [] 

123 

124 insert_params = ["body", "from_number", "to_number"] 

125 for row in query.values: 

126 params = dict(zip(columns, row)) 

127 

128 # Check text length 

129 max_text_len = 1500 

130 text = params["body"] 

131 words = re.split('( )', text) 

132 messages = [] 

133 

134 """ 

135 Regex for matching if any URls are present, if yes then replace with string of hyphens(-) 

136 

137 Example: 

138 words = ['Check', ' ', 'out', ' ', 'this', ' ', 'cool', ' ', 'website:', ' ', 'https://example.com.', "It's", ' ', 'awesome!'] 

139 

140 After parsing through regex ('https://example.com') URL is matched 

141 

142 Final output: 

143 messages = ['Check - out - this - cool - website: ----------------------- "It\'s - awesome!'] 

144 """ 

145 

146 text2 = '' 

147 pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' 

148 for word in words: 

149 # replace the links in word to string with the length as twitter short url (23) 

150 word2 = re.sub(pattern, '-' * 23, word) 

151 if len(text2) + len(word2) > max_text_len - 3 - 7: # 3 is for ..., 7 is for (10/11) 

152 messages.append(text2.strip()) 

153 

154 text2 = '' 

155 text2 += word 

156 

157 # Parse last message 

158 if text2.strip() != '': 

159 messages.append(text2.strip()) 

160 

161 len_messages = len(messages) 

162 

163 # Modify message based on the length 

164 for i, text in enumerate(messages): 

165 if i < len_messages - 1: 

166 text += '...' 

167 else: 

168 text += ' ' 

169 

170 if i >= 1: 

171 text += f'({i + 1}/{len_messages})' 

172 

173 # Pass parameters and call 'send_message' 

174 params['body'] = text 

175 params_to_send = {key: params[key] for key in insert_params if (key in params)} 

176 ret_row = self.handler.send_message(params_to_send, ret_as_dict=True) 

177 

178 # Save the results 

179 ret_row['body'] = text 

180 ret.append(ret_row) 

181 

182 return pd.DataFrame(ret) 

183 

184 

185class WhatsAppHandler(APIHandler): 

186 """ 

187 A class for handling connections and interactions with Twilio WhatsApp API. 

188 Args: 

189 account_sid(str): Accound ID of the twilio account. 

190 auth_token(str): Authentication Token obtained from the twilio account. 

191 """ 

192 

193 def __init__(self, name=None, **kwargs): 

194 """ 

195 Initializes the connection by checking all the params are provided by the user. 

196 """ 

197 super().__init__(name) 

198 

199 args = kwargs.get('connection_data', {}) 

200 self.connection_args = {} 

201 handler_config = Config().get('whatsapp_handler', {}) 

202 for k in ['account_sid', 'auth_token']: 

203 if k in args: 

204 self.connection_args[k] = args[k] 

205 elif f'TWILIO_{k.upper()}' in os.environ: 

206 self.connection_args[k] = os.environ[f'TWILIO_{k.upper()}'] 

207 elif k in handler_config: 

208 self.connection_args[k] = handler_config[k] 

209 self.client = None 

210 self.is_connected = False 

211 

212 messages = WhatsAppMessagesTable(self) 

213 self._register_table('messages', messages) 

214 

215 def connect(self): 

216 """ 

217 Authenticate with the Twilio API using the provided `account_SID` and `auth_token`. 

218 """ 

219 if self.is_connected is True: 

220 return self.client 

221 

222 self.client = Client( 

223 self.connection_args['account_sid'], 

224 self.connection_args['auth_token'] 

225 ) 

226 

227 self.is_connected = True 

228 return self.client 

229 

230 def check_connection(self) -> StatusResponse: 

231 """ 

232 Checks the connection by performing a basic operation with the Twilio API. 

233 """ 

234 response = StatusResponse(False) 

235 

236 try: 

237 self.connect() 

238 response.success = True 

239 

240 except Exception as e: 

241 response.error_message = f'Error connecting to Twilio API: {str(e)}. Check credentials.' 

242 logger.error(response.error_message) 

243 

244 if response.success is False and self.is_connected is True: 

245 self.is_connected = False 

246 

247 return response 

248 

249 def parse_native_query(self, query_string: str): 

250 """Parses the native query string of format method(arg1=val1, arg2=val2, ...) and returns the method name and arguments.""" 

251 

252 # Adjust regex to account for the possibility of no arguments inside the parenthesis 

253 match = re.match(r'(\w+)\(([^)]*)\)', query_string) 

254 if not match: 

255 raise ValueError(f"Invalid query format: {query_string}") 

256 

257 method_name = match.group(1) 

258 arg_string = match.group(2) 

259 

260 # Extract individual arguments 

261 args = {} 

262 if arg_string: # Check if there are any arguments 

263 for arg in arg_string.split(','): 

264 arg = arg.strip() 

265 key, value = arg.split('=') 

266 args[key.strip()] = value.strip() 

267 

268 return method_name, args 

269 

270 def native_query(self, query_string: str = None): 

271 """ 

272 Retreievs the native query from the `parse_native_query` and calls appropriate function and returns the result of the query as a Response object. 

273 """ 

274 method_name, params = self.parse_native_query(query_string) 

275 if method_name == 'send_message': 

276 response = self.send_message(params) 

277 else: 

278 raise ValueError(f"Method '{method_name}' not supported by TwilioHandler") 

279 

280 return response 

281 

282 def fetch_messages(self, params, df=False): 

283 """ 

284 Gets conversation history 

285 

286 Returns: 

287 Response: conversation history 

288 """ 

289 limit = int(params.get('limit', 1000)) 

290 sid = params.get('sid', None) 

291 # Convert date strings to datetime objects if provided 

292 date_sent_after = params.get('date_sent_after', None) 

293 date_sent_before = params.get('date_sent_before', None) 

294 # Extract 'from_' and 'body' search criteria from params 

295 from_number = params.get('from_number', None) 

296 to_number = params.get('to_number', None) 

297 args = { 

298 'limit': limit, 

299 'date_sent_after': date_sent_after, 

300 'date_sent_before': date_sent_before, 

301 'from_': from_number, 

302 'to': to_number 

303 } 

304 

305 args = {arg: val for arg, val in args.items() if val is not None} 

306 if sid: 

307 messages = [self.client.messages(sid).fetch()] 

308 else: 

309 messages = self.client.messages.list(**args) 

310 

311 # Extract all possible properties for each message 

312 data = [] 

313 for msg in messages: 

314 msg_data = { 

315 'sid': msg.sid, 

316 'to_number': msg.to, 

317 'from_number': msg.from_, 

318 'body': msg.body, 

319 'direction': msg.direction, 

320 'msg_status': msg.status, 

321 'sent_at': msg.date_created.replace(tzinfo=None), 

322 'account_sid': msg.account_sid, 

323 'price': msg.price, 

324 'price_unit': msg.price_unit, 

325 'api_version': msg.api_version, 

326 'uri': msg.uri, 

327 # 'media_url': [media.uri for media in msg.media.list()] 

328 # ... Add other properties as needed 

329 } 

330 data.append(msg_data) 

331 

332 # Create a DataFrame 

333 result_df = pd.DataFrame(data) 

334 

335 # Filter rows where 'from_number' or 'to_number' begins with 'whatsapp:' 

336 result_df = result_df[result_df['from_number'].str.startswith('whatsapp:') | result_df['to_number'].str.startswith('whatsapp:')] 

337 

338 if df is True: 

339 return pd.DataFrame(result_df) 

340 return Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(result_df)) 

341 

342 def send_message(self, params, ret_as_dict=False) -> Response: 

343 """ 

344 Sends a message to the given Whatsapp number. 

345 

346 Args: 

347 body: message body 

348 from_number: number from which to send the message 

349 to_number: number to which message will be sent 

350 """ 

351 try: 

352 message = self.client.messages.create( 

353 body=params.get('body'), 

354 to=params.get('to_number'), 

355 from_=params.get('from_number') 

356 ) 

357 

358 if ret_as_dict is True: 

359 return {"sid": message.sid, "from": message.from_, "to": message.to, "message": message.body, "status": message.status} 

360 

361 return Response( 

362 RESPONSE_TYPE.MESSAGE, 

363 sid=message.sid, 

364 from_=message.from_, 

365 to=message.to, 

366 body=message.body, 

367 status=message.status 

368 ) 

369 

370 except Exception as e: 

371 # Log the exception for debugging purposes 

372 logger.error(f"Error sending message: {str(e)}") 

373 logger.exception(f"Error sending message: {str(e)}") 

374 raise Exception("Error sending message") 

375 

376 def call_whatsapp_api(self, method_name: str = None, params: dict = None): 

377 """ 

378 Calls specific method specified. 

379 

380 Args: 

381 method_name: to call specific method 

382 params: parameters to call the method 

383 

384 Returns: 

385 List of dictionaries as a result of the method call 

386 """ 

387 api = self.connect() 

388 method = getattr(api, method_name) 

389 

390 try: 

391 result = method(**params) 

392 except Exception as e: 

393 error = f"Error calling method '{method_name}' with params '{params}': {e}" 

394 logger.error(error) 

395 raise e 

396 

397 if 'messages' in result: 

398 result['messages'] = self.convert_channel_data(result['messages']) 

399 

400 return [result] 

401 

402 def convert_channel_data(self, messages: List[dict]): 

403 """ 

404 Convert the list of channel dictionaries to a format that can be easily used in the data pipeline. 

405 

406 Args: 

407 channels: A list of channel dictionaries. 

408 

409 Returns: 

410 A list of channel dictionaries with modified keys and values. 

411 """ 

412 new_messages = [] 

413 for message in messages: 

414 new_message = { 

415 'id': message['id'], 

416 'name': message['name'], 

417 'created': datetime.fromtimestamp(float(message['created'])) 

418 } 

419 new_messages.append(new_message) 

420 return new_messages