Coverage for mindsdb / integrations / handlers / twilio_handler / twilio_handler.py: 0%

196 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2 

3import re 

4from twilio.rest import Client 

5import pandas as pd 

6from mindsdb.integrations.libs.api_handler import APIHandler, APITable 

7from mindsdb.integrations.libs.response import ( 

8 HandlerStatusResponse as StatusResponse, 

9 HandlerResponse as Response, 

10 RESPONSE_TYPE 

11) 

12from mindsdb.utilities.config import Config 

13from mindsdb.utilities import log 

14from mindsdb.integrations.utilities.date_utils import parse_local_date 

15from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions, project_dataframe, filter_dataframe 

16 

17from mindsdb_sql_parser import ast 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class PhoneNumbersTable(APITable): 

23 

24 def select(self, query: ast.Select) -> Response: 

25 

26 conditions = extract_comparison_conditions(query.where) 

27 

28 params = {} 

29 filters = [] 

30 for op, arg1, arg2 in conditions: 

31 

32 if op == 'or': 

33 raise NotImplementedError('OR is not supported') 

34 else: 

35 filters.append([op, arg1, arg2]) 

36 

37 if query.limit is not None: 

38 params['limit'] = query.limit.value 

39 

40 result = self.handler.list_phone_numbers(params, df=True) 

41 

42 # filter targets 

43 result = filter_dataframe(result, filters) 

44 

45 # project targets 

46 result = project_dataframe(result, query.targets, self.get_columns()) 

47 

48 return result 

49 

50 def get_columns(self): 

51 return [ 

52 'sid', 

53 'date_created', 

54 'date_updated', 

55 'phone_number', 

56 'friendly_name', 

57 'account_sid', 

58 'capabilities', 

59 'number_status', 

60 'api_version', 

61 'voice_url', 

62 'sms_url', 

63 'uri' 

64 ] 

65 

66 

67class MessagesTable(APITable): 

68 

69 def select(self, query: ast.Select) -> Response: 

70 

71 conditions = extract_comparison_conditions(query.where) 

72 

73 params = {} 

74 filters = [] 

75 for op, arg1, arg2 in conditions: 

76 

77 if op == 'or': 

78 raise NotImplementedError('OR is not supported') 

79 if arg1 == 'sent_at' and arg2 is not None: 

80 

81 date = parse_local_date(arg2) 

82 

83 if op == '>': 

84 params['date_sent_after'] = date 

85 elif op == '<': 

86 params['date_sent_before'] = date 

87 else: 

88 raise NotImplementedError 

89 

90 # also add to post query filter because date_sent_after=date1 will include date1 

91 filters.append([op, arg1, arg2]) 

92 

93 elif arg1 == 'sid': 

94 if op == '=': 

95 params['sid'] = arg2 

96 # TODO: implement IN 

97 else: 

98 NotImplementedError('Only "from_number=" is implemented') 

99 elif arg1 == 'from_number': 

100 if op == '=': 

101 params['from_number'] = arg2 

102 # TODO: implement IN 

103 else: 

104 NotImplementedError('Only "from_number=" is implemented') 

105 

106 elif arg1 == 'to_number': 

107 if op == '=': 

108 params['to_number'] = arg2 

109 # TODO: implement IN 

110 else: 

111 NotImplementedError('Only "to_number=" is implemented') 

112 

113 else: 

114 filters.append([op, arg1, arg2]) 

115 

116 result = self.handler.fetch_messages(params, df=True) 

117 

118 # filter targets 

119 result = filter_dataframe(result, filters) 

120 

121 if query.limit is not None: 

122 result = result[:int(query.limit.value)] 

123 

124 # project targets 

125 result = project_dataframe(result, query.targets, self.get_columns()) 

126 

127 return result 

128 

129 def get_columns(self): 

130 return [ 

131 'sid', 

132 'from_number', 

133 'to_number', 

134 'body', 

135 'direction', 

136 'msg_status', 

137 'sent_at', # datetime.strptime(str(msg.date_sent), '%Y-%m-%d %H:%M:%S%z'), 

138 'account_sid', 

139 'price', 

140 'price_unit', 

141 'api_version', 

142 'uri' 

143 ] 

144 

145 def insert(self, query: ast.Insert): 

146 # https://docs.tweepy.org/en/stable/client.html#tweepy.Client.create_tweet 

147 columns = [col.name for col in query.columns] 

148 

149 ret = [] 

150 

151 insert_params = ["to_number", "from_number", "body", 'media_url'] 

152 for row in query.values: 

153 params = dict(zip(columns, row)) 

154 

155 # split long text over 1500 symbols 

156 max_text_len = 1500 

157 text = params['body'] 

158 words = re.split('( )', text) 

159 messages = [] 

160 

161 text2 = '' 

162 pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' 

163 for word in words: 

164 # replace the links in word to string with the length as twitter short url (23) 

165 word2 = re.sub(pattern, '-' * 23, word) 

166 if len(text2) + len(word2) > max_text_len - 3 - 7: # 3 is for ..., 7 is for (10/11) 

167 messages.append(text2.strip()) 

168 

169 text2 = '' 

170 text2 += word 

171 

172 # the last message 

173 if text2.strip() != '': 

174 messages.append(text2.strip()) 

175 

176 len_messages = len(messages) 

177 

178 for i, text in enumerate(messages): 

179 if i < len_messages - 1: 

180 text += '...' 

181 else: 

182 text += ' ' 

183 

184 if i >= 1: 

185 text += f'({i + 1}/{len_messages})' 

186 # only send image on first url 

187 if 'media_url' in params: 

188 del params['media_url'] 

189 

190 params['body'] = text 

191 params_to_send = {key: params[key] for key in insert_params if (key in params)} 

192 ret_row = self.handler.send_sms(params_to_send, ret_as_dict=True) 

193 ret_row['body'] = text 

194 ret.append(ret_row) 

195 

196 return pd.DataFrame(ret) 

197 

198 

199class TwilioHandler(APIHandler): 

200 

201 def __init__(self, name=None, **kwargs): 

202 super().__init__(name) 

203 

204 args = kwargs.get('connection_data', {}) 

205 

206 self.connection_args = {} 

207 handler_config = Config().get('twilio_handler', {}) 

208 for k in ['account_sid', 'auth_token']: 

209 if k in args: 

210 self.connection_args[k] = args[k] 

211 elif f'TWILIO_{k.upper()}' in os.environ: 

212 self.connection_args[k] = os.environ[f'TWILIO_{k.upper()}'] 

213 elif k in handler_config: 

214 self.connection_args[k] = handler_config[k] 

215 

216 self.client = None 

217 self.is_connected = False 

218 

219 messages = MessagesTable(self) 

220 phone_numbers = PhoneNumbersTable(self) 

221 self._register_table('messages', messages) 

222 self._register_table('phone_numbers', phone_numbers) 

223 

224 def connect(self): 

225 """Authenticate with the Twilio API using the account_sid and auth_token provided in the constructor.""" 

226 if self.is_connected is True: 

227 return self.client 

228 

229 self.client = Client( 

230 self.connection_args['account_sid'], 

231 self.connection_args['auth_token'] 

232 ) 

233 

234 self.is_connected = True 

235 return self.client 

236 

237 def check_connection(self) -> StatusResponse: 

238 '''It evaluates if the connection with Twilio API is alive and healthy.''' 

239 response = StatusResponse(False) 

240 

241 try: 

242 self.connect() 

243 # Maybe make a harmless API request to verify connection, but be mindful of rate limits and costs 

244 response.success = True 

245 

246 except Exception as e: 

247 response.error_message = f'Error connecting to Twilio api: {e}. ' 

248 logger.error(response.error_message) 

249 

250 if response.success is False and self.is_connected is True: 

251 self.is_connected = False 

252 

253 return response 

254 

255 def parse_native_query(self, query_string: str): 

256 """Parses the native query string of format method(arg1=val1, arg2=val2, ...) and returns the method name and arguments.""" 

257 

258 # Adjust regex to account for the possibility of no arguments inside the parenthesis 

259 match = re.match(r'(\w+)\(([^)]*)\)', query_string) 

260 if not match: 

261 raise ValueError(f"Invalid query format: {query_string}") 

262 

263 method_name = match.group(1) 

264 arg_string = match.group(2) 

265 

266 # Extract individual arguments 

267 args = {} 

268 if arg_string: # Check if there are any arguments 

269 for arg in arg_string.split(','): 

270 arg = arg.strip() 

271 key, value = arg.split('=') 

272 args[key.strip()] = value.strip() 

273 

274 return method_name, args 

275 

276 def native_query(self, query_string: str = None): 

277 '''It parses any native statement string and acts upon it (for example, raw syntax commands).''' 

278 

279 method_name, params = self.parse_native_query(query_string) 

280 if method_name == 'send_sms': 

281 response = self.send_sms(params) 

282 elif method_name == 'fetch_messages': 

283 response = self.fetch_messages(params) 

284 elif method_name == 'list_phone_numbers': 

285 response = self.list_phone_numbers(params) 

286 else: 

287 raise ValueError(f"Method '{method_name}' not supported by TwilioHandler") 

288 

289 return response 

290 

291 def send_sms(self, params, ret_as_dict=False): 

292 message = self.client.messages.create( 

293 to=params.get("to_number"), 

294 from_=params.get('from_number'), 

295 body=params.get("body"), 

296 media_url=params.get("media_url") 

297 ) 

298 

299 if ret_as_dict is True: 

300 return {'sid': message.sid, 'status': message.status} 

301 return Response( 

302 RESPONSE_TYPE.MESSAGE, 

303 sid=message.sid, 

304 status=message.status 

305 ) 

306 

307 def fetch_messages(self, params, df=False): 

308 limit = int(params.get('limit', 1000)) 

309 sid = params.get('sid', None) 

310 # Convert date strings to datetime objects if provided 

311 date_sent_after = params.get('date_sent_after', None) 

312 date_sent_before = params.get('date_sent_before', None) 

313 # Extract 'from_' and 'body' search criteria from params 

314 from_number = params.get('from_number', None) 

315 to_number = params.get('to_number', None) 

316 args = { 

317 'limit': limit, 

318 'date_sent_after': date_sent_after, 

319 'date_sent_before': date_sent_before, 

320 'from_': from_number, 

321 'to': to_number 

322 } 

323 

324 args = {arg: val for arg, val in args.items() if val is not None} 

325 if sid: 

326 messages = [self.client.messages(sid).fetch()] 

327 else: 

328 messages = self.client.messages.list(**args) 

329 

330 # Extract all possible properties for each message 

331 data = [] 

332 for msg in messages: 

333 msg_data = { 

334 'sid': msg.sid, 

335 'to_number': msg.to, 

336 'from_number': msg.from_, 

337 'body': msg.body, 

338 'direction': msg.direction, 

339 'msg_status': msg.status, 

340 'sent_at': msg.date_created.replace(tzinfo=None), 

341 'account_sid': msg.account_sid, 

342 'price': msg.price, 

343 'price_unit': msg.price_unit, 

344 'api_version': msg.api_version, 

345 'uri': msg.uri, 

346 # 'media_url': [media.uri for media in msg.media.list()] 

347 # ... Add other properties as needed 

348 } 

349 data.append(msg_data) 

350 

351 if df is True: 

352 return pd.DataFrame(data) 

353 return Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(data)) 

354 

355 def list_phone_numbers(self, params, df=False): 

356 limit = int(params.get('limit', 100)) 

357 args = { 

358 'limit': limit 

359 } 

360 args = {arg: val for arg, val in args.items() if val is not None} 

361 phone_numbers = self.client.incoming_phone_numbers.list(**args) 

362 

363 # Extract properties for each phone number 

364 data = [] 

365 for number in phone_numbers: 

366 num_data = { 

367 'sid': number.sid, 

368 'date_created': number.date_created, 

369 'date_updated': number.date_updated, 

370 'phone_number': number.phone_number, 

371 'friendly_name': number.friendly_name, 

372 'account_sid': number.account_sid, 

373 'capabilities': number.capabilities, 

374 'number_status': number.status, 

375 'api_version': number.api_version, 

376 'voice_url': number.voice_url, 

377 'sms_url': number.sms_url, 

378 'uri': number.uri, 

379 # ... Add other properties as needed 

380 } 

381 data.append(num_data) 

382 

383 if df is True: 

384 return pd.DataFrame(data) 

385 return Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(data))