Coverage for mindsdb / integrations / handlers / stripe_handler / stripe_tables.py: 0%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2import stripe 

3from typing import Text, List, Dict, Any 

4 

5from mindsdb_sql_parser import ast 

6from mindsdb.integrations.libs.api_handler import APITable 

7from mindsdb.integrations.utilities.handlers.query_utilities import INSERTQueryParser, DELETEQueryParser, UPDATEQueryParser, DELETEQueryExecutor, UPDATEQueryExecutor 

8from mindsdb.integrations.utilities.handlers.query_utilities.select_query_utilities import SELECTQueryParser, SELECTQueryExecutor 

9from mindsdb.utilities import log 

10 

11logger = log.getLogger(__name__) 

12 

13 

14class CustomersTable(APITable): 

15 """The Stripe Customers Table implementation""" 

16 

17 def select(self, query: ast.Select) -> pd.DataFrame: 

18 """ 

19 Pulls Stripe Customer data. 

20 

21 Parameters 

22 ---------- 

23 query : ast.Select 

24 Given SQL SELECT query 

25 

26 Returns 

27 ------- 

28 pd.DataFrame 

29 Stripe Customers matching the query 

30 

31 Raises 

32 ------ 

33 ValueError 

34 If the query contains an unsupported condition 

35 """ 

36 

37 select_statement_parser = SELECTQueryParser( 

38 query, 

39 'customers', 

40 self.get_columns() 

41 ) 

42 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

43 

44 customers_df = pd.json_normalize(self.get_customers(limit=result_limit)) 

45 select_statement_executor = SELECTQueryExecutor( 

46 customers_df, 

47 selected_columns, 

48 where_conditions, 

49 order_by_conditions 

50 ) 

51 customers_df = select_statement_executor.execute_query() 

52 

53 return customers_df 

54 

55 def get_columns(self) -> List[Text]: 

56 return pd.json_normalize(self.get_customers(limit=1)).columns.tolist() 

57 

58 def get_customers(self, **kwargs) -> List[Dict]: 

59 stripe = self.handler.connect() 

60 customers = stripe.Customer.list(**kwargs) 

61 return [customer.to_dict() for customer in customers] 

62 

63 

64class ProductsTable(APITable): 

65 """The Stripe Products Table implementation""" 

66 

67 def select(self, query: ast.Select) -> pd.DataFrame: 

68 """ 

69 Pulls Stripe Product data. 

70 

71 Parameters 

72 ---------- 

73 query : ast.Select 

74 Given SQL SELECT query 

75 

76 Returns 

77 ------- 

78 pd.DataFrame 

79 Stripe Products matching the query 

80 

81 Raises 

82 ------ 

83 ValueError 

84 If the query contains an unsupported condition 

85 """ 

86 

87 select_statement_parser = SELECTQueryParser( 

88 query, 

89 'products', 

90 self.get_columns() 

91 ) 

92 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

93 

94 products_df = pd.json_normalize(self.get_products(limit=result_limit)) 

95 select_statement_executor = SELECTQueryExecutor( 

96 products_df, 

97 selected_columns, 

98 where_conditions, 

99 order_by_conditions 

100 ) 

101 products_df = select_statement_executor.execute_query() 

102 

103 return products_df 

104 

105 def insert(self, query: ast.Insert) -> None: 

106 """ 

107 Inserts data into Stripe "POST v1/products" API endpoint. 

108 

109 Parameters 

110 ---------- 

111 query : ast.Insert 

112 Given SQL INSERT query 

113 

114 Returns 

115 ------- 

116 None 

117 

118 Raises 

119 ------ 

120 ValueError 

121 If the query contains an unsupported condition 

122 """ 

123 insert_statement_parser = INSERTQueryParser( 

124 query, 

125 supported_columns=['id', 'name', 'active', 'description', 'metadata'], 

126 mandatory_columns=['name'], 

127 all_mandatory=False, 

128 ) 

129 product_data = insert_statement_parser.parse_query() 

130 self.create_products(product_data) 

131 

132 def update(self, query: ast.Update) -> None: 

133 """ 

134 Updates data from Stripe "POST v1/products/:id" API endpoint. 

135 

136 Parameters 

137 ---------- 

138 query : ast.Update 

139 Given SQL UPDATE query 

140 

141 Returns 

142 ------- 

143 None 

144 

145 Raises 

146 ------ 

147 ValueError 

148 If the query contains an unsupported condition 

149 """ 

150 update_statement_parser = UPDATEQueryParser(query) 

151 values_to_update, where_conditions = update_statement_parser.parse_query() 

152 

153 products_df = pd.json_normalize(self.get_products()) 

154 update_query_executor = UPDATEQueryExecutor( 

155 products_df, 

156 where_conditions 

157 ) 

158 

159 products_df = update_query_executor.execute_query() 

160 product_ids = products_df['id'].tolist() 

161 self.update_products(product_ids, values_to_update) 

162 

163 def delete(self, query: ast.Delete) -> None: 

164 """ 

165 Deletes data from Stripe "DELETE v1/products/:id" API endpoint. 

166 

167 Parameters 

168 ---------- 

169 query : ast.Delete 

170 Given SQL DELETE query 

171 

172 Returns 

173 ------- 

174 None 

175 

176 Raises 

177 ------ 

178 ValueError 

179 If the query contains an unsupported condition 

180 """ 

181 delete_statement_parser = DELETEQueryParser(query) 

182 where_conditions = delete_statement_parser.parse_query() 

183 

184 products_df = pd.json_normalize(self.get_products()) 

185 delete_query_executor = DELETEQueryExecutor( 

186 products_df, 

187 where_conditions 

188 ) 

189 

190 products_df = delete_query_executor.execute_query() 

191 product_ids = products_df['id'].tolist() 

192 self.delete_products(product_ids) 

193 

194 def get_columns(self) -> List[Text]: 

195 return pd.json_normalize(self.get_products(limit=1)).columns.tolist() 

196 

197 def get_products(self, **kwargs) -> List[Dict]: 

198 stripe = self.handler.connect() 

199 products = stripe.Product.list(**kwargs) 

200 return [product.to_dict() for product in products] 

201 

202 def create_products(self, product_data: List[Dict[Text, Any]]) -> None: 

203 stripe = self.handler.connect() 

204 for product in product_data: 

205 created_product = stripe.Product.create(**product) 

206 if 'id' not in created_product.to_dict(): 

207 raise Exception('Product creation failed') 

208 else: 

209 logger.info(f'Product {created_product.to_dict()["id"]} created') 

210 

211 def update_products(self, product_ids: List[Text], values_to_update: Dict[Text, Any]) -> None: 

212 stripe = self.handler.connect() 

213 for product_id in product_ids: 

214 updated_product = stripe.Product.modify(product_id, **values_to_update) 

215 if 'id' not in updated_product.to_dict(): 

216 raise Exception('Product update failed') 

217 else: 

218 logger.info(f'Product {updated_product.to_dict()["id"]} updated') 

219 

220 def delete_products(self, product_ids: List[Text]) -> None: 

221 stripe = self.handler.connect() 

222 for product_id in product_ids: 

223 deleted_product = stripe.Product.delete(product_id) 

224 if 'id' not in deleted_product.to_dict(): 

225 raise Exception('Product deletion failed') 

226 else: 

227 logger.info(f'Product {deleted_product.to_dict()["id"]} deleted') 

228 

229 

230class PaymentIntentsTable(APITable): 

231 """The Stripe Payment Intents Table implementation""" 

232 

233 def select(self, query: ast.Select) -> pd.DataFrame: 

234 """ 

235 Pulls Stripe Payment Intents data. 

236 

237 Parameters 

238 ---------- 

239 query : ast.Select 

240 Given SQL SELECT query 

241 

242 Returns 

243 ------- 

244 pd.DataFrame 

245 Stripe Payment Intents matching the query 

246 

247 Raises 

248 ------ 

249 ValueError 

250 If the query contains an unsupported condition 

251 """ 

252 

253 select_statement_parser = SELECTQueryParser( 

254 query, 

255 'payment_intents', 

256 self.get_columns() 

257 ) 

258 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

259 

260 payment_intents_df = pd.json_normalize(self.get_payment_intents(limit=result_limit)) 

261 select_statement_executor = SELECTQueryExecutor( 

262 payment_intents_df, 

263 selected_columns, 

264 where_conditions, 

265 order_by_conditions 

266 ) 

267 payment_intents_df = select_statement_executor.execute_query() 

268 

269 return payment_intents_df 

270 

271 def delete(self, query: ast.Delete) -> None: 

272 """ 

273 Cancels Stripe Payment Intents and updates the local data. 

274 

275 Parameters 

276 ---------- 

277 query : ast.Delete 

278 Given SQL DELETE query 

279 

280 Returns 

281 ------- 

282 None 

283 

284 Raises 

285 ------ 

286 ValueError 

287 If the query contains an unsupported condition 

288 """ 

289 delete_statement_parser = DELETEQueryParser(query) 

290 where_conditions = delete_statement_parser.parse_query() 

291 

292 if 'payment_intents_df' not in self.__dict__: 

293 self.payment_intents_df = pd.json_normalize(self.get_payment_intents()) 

294 

295 delete_query_executor = DELETEQueryExecutor( 

296 self.payment_intents_df, 

297 where_conditions 

298 ) 

299 

300 canceled_payment_intents_df = delete_query_executor.execute_query() 

301 

302 payment_intent_ids = canceled_payment_intents_df['id'].tolist() 

303 self.cancel_payment_intents(payment_intent_ids) 

304 

305 self.payment_intents_df = self.payment_intents_df[~self.payment_intents_df['id'].isin(payment_intent_ids)] 

306 

307 def cancel_payment_intents(self, payment_intent_ids: List[str]) -> None: 

308 stripe = self.handler.connect() 

309 for payment_intent_id in payment_intent_ids: 

310 try: 

311 

312 payment_intent = stripe.PaymentIntent.retrieve(payment_intent_id) 

313 if payment_intent.status in ['requires_payment_method', 'requires_capture', 'requires_confirmation', 'requires_action', 'processing']: 

314 stripe.PaymentIntent.cancel(payment_intent_id) 

315 else: 

316 logger.warning(f"Payment intent {payment_intent_id} is in status {payment_intent.status} and cannot be canceled.") 

317 except stripe.error.StripeError as e: 

318 logger.error(f"Error cancelling payment intent {payment_intent_id}: {str(e)}") 

319 

320 def update(self, query: 'ast.Update') -> None: 

321 """ 

322 Updates data in Stripe "POST /v1/payment_intents/:id" API endpoint. 

323 

324 Parameters 

325 ---------- 

326 query : ast.Update 

327 Given SQL UPDATE query 

328 

329 Returns 

330 ------- 

331 None 

332 

333 Raises 

334 ------ 

335 ValueError 

336 If the query contains an unsupported condition 

337 """ 

338 update_statement_parser = UPDATEQueryParser(query) 

339 values_to_update, where_conditions = update_statement_parser.parse_query() 

340 

341 payment_intents_df = pd.json_normalize(self.get_payment_intents()) 

342 update_query_executor = UPDATEQueryExecutor( 

343 payment_intents_df, 

344 where_conditions 

345 ) 

346 

347 payment_intents_df = update_query_executor.execute_query() 

348 payment_intent_ids = payment_intents_df['id'].tolist() 

349 self.update_payment_intents(payment_intent_ids, values_to_update) 

350 

351 def update_payment_intents(self, payment_intent_ids: list, values_to_update: dict) -> None: 

352 for payment_intent_id in payment_intent_ids: 

353 stripe.PaymentIntent.modify(payment_intent_id, **values_to_update) 

354 

355 def insert(self, query: 'ast.Insert') -> None: 

356 """ 

357 Inserts data into Stripe "POST /v1/payment_intents" API endpoint. 

358 

359 Parameters 

360 ---------- 

361 query : ast.Insert 

362 Given SQL INSERT query 

363 

364 Returns 

365 ------- 

366 None 

367 

368 Raises 

369 ------ 

370 ValueError 

371 If the query contains an unsupported condition 

372 """ 

373 insert_statement_parser = INSERTQueryParser( 

374 query, 

375 supported_columns=['amount', 'currency', 'description', 'payment_method_types'], 

376 mandatory_columns=['amount', 'currency'], 

377 all_mandatory=True 

378 ) 

379 payment_intent_data = insert_statement_parser.parse_query() 

380 self.create_payment_intent(payment_intent_data) 

381 

382 def create_payment_intent(self, payment_intent_data: list) -> None: 

383 for data in payment_intent_data: 

384 stripe.PaymentIntent.create(**data) 

385 

386 def get_columns(self) -> List[Text]: 

387 return pd.json_normalize(self.get_payment_intents(limit=1)).columns.tolist() 

388 

389 def get_payment_intents(self, **kwargs) -> List[Dict]: 

390 stripe = self.handler.connect() 

391 payment_intents = stripe.PaymentIntent.list(**kwargs) 

392 return [payment_intent.to_dict() for payment_intent in payment_intents] 

393 

394 

395class RefundsTable(APITable): 

396 """The Stripe Refund Table implementation""" 

397 

398 def select(self, query: ast.Select) -> pd.DataFrame: 

399 """ 

400 Pulls Stripe Refund data. 

401 

402 Parameters 

403 ---------- 

404 query : ast.Select 

405 Given SQL SELECT query 

406 

407 Returns 

408 ------- 

409 pd.DataFrame 

410 Stripe Refunds matching the query 

411 

412 Raises 

413 ------ 

414 ValueError 

415 If the query contains an unsupported condition 

416 """ 

417 

418 select_statement_parser = SELECTQueryParser( 

419 query, 

420 'refunds', 

421 self.get_columns() 

422 ) 

423 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

424 

425 refunds_df = pd.json_normalize(self.get_refunds(limit=result_limit)) 

426 select_statement_executor = SELECTQueryExecutor( 

427 refunds_df, 

428 selected_columns, 

429 where_conditions, 

430 order_by_conditions 

431 ) 

432 refunds_df = select_statement_executor.execute_query() 

433 

434 return refunds_df 

435 

436 def get_columns(self) -> List[Text]: 

437 return pd.json_normalize(self.get_refunds(limit=1)).columns.tolist() 

438 

439 def get_refunds(self, **kwargs) -> List[Dict]: 

440 stripe = self.handler.connect() 

441 refunds = stripe.Refund.list(**kwargs) 

442 return [refund.to_dict() for refund in refunds] 

443 

444 

445class PayoutsTable(APITable): 

446 """The Stripe Payouts Table implementation""" 

447 

448 def select(self, query: ast.Select) -> pd.DataFrame: 

449 """ 

450 Pulls Stripe Payout data. 

451 

452 Parameters 

453 ---------- 

454 query : ast.Select 

455 Given SQL SELECT query 

456 

457 Returns 

458 ------- 

459 pd.DataFrame 

460 Stripe Payouts matching the query 

461 

462 Raises 

463 ------ 

464 ValueError 

465 If the query contains an unsupported condition 

466 """ 

467 

468 select_statement_parser = SELECTQueryParser( 

469 query, 

470 'payouts', 

471 self.get_columns() 

472 ) 

473 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

474 

475 payouts_df = pd.json_normalize(self.get_payouts(limit=result_limit)) 

476 select_statement_executor = SELECTQueryExecutor( 

477 payouts_df, 

478 selected_columns, 

479 where_conditions, 

480 order_by_conditions 

481 ) 

482 payouts_df = select_statement_executor.execute_query() 

483 

484 return payouts_df 

485 

486 def get_columns(self) -> List[Text]: 

487 return pd.json_normalize(self.get_payouts(limit=1)).columns.tolist() 

488 

489 def get_payouts(self, **kwargs) -> List[Dict]: 

490 stripe = self.handler.connect() 

491 payouts = stripe.Payout.list(**kwargs) 

492 return [payout.to_dict() for payout in payouts]