Coverage for mindsdb / integrations / handlers / stripe_handler / stripe_tables.py: 0%
158 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
2import stripe
3from typing import Text, List, Dict, Any
5from mindsdb_sql_parser import ast
6from mindsdb.integrations.libs.api_handler import APITable
7from mindsdb.integrations.utilities.handlers.query_utilities import INSERTQueryParser, DELETEQueryParser, UPDATEQueryParser, DELETEQueryExecutor, UPDATEQueryExecutor
8from mindsdb.integrations.utilities.handlers.query_utilities.select_query_utilities import SELECTQueryParser, SELECTQueryExecutor
9from mindsdb.utilities import log
11logger = log.getLogger(__name__)
14class CustomersTable(APITable):
15 """The Stripe Customers Table implementation"""
17 def select(self, query: ast.Select) -> pd.DataFrame:
18 """
19 Pulls Stripe Customer data.
21 Parameters
22 ----------
23 query : ast.Select
24 Given SQL SELECT query
26 Returns
27 -------
28 pd.DataFrame
29 Stripe Customers matching the query
31 Raises
32 ------
33 ValueError
34 If the query contains an unsupported condition
35 """
37 select_statement_parser = SELECTQueryParser(
38 query,
39 'customers',
40 self.get_columns()
41 )
42 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
44 customers_df = pd.json_normalize(self.get_customers(limit=result_limit))
45 select_statement_executor = SELECTQueryExecutor(
46 customers_df,
47 selected_columns,
48 where_conditions,
49 order_by_conditions
50 )
51 customers_df = select_statement_executor.execute_query()
53 return customers_df
55 def get_columns(self) -> List[Text]:
56 return pd.json_normalize(self.get_customers(limit=1)).columns.tolist()
58 def get_customers(self, **kwargs) -> List[Dict]:
59 stripe = self.handler.connect()
60 customers = stripe.Customer.list(**kwargs)
61 return [customer.to_dict() for customer in customers]
64class ProductsTable(APITable):
65 """The Stripe Products Table implementation"""
67 def select(self, query: ast.Select) -> pd.DataFrame:
68 """
69 Pulls Stripe Product data.
71 Parameters
72 ----------
73 query : ast.Select
74 Given SQL SELECT query
76 Returns
77 -------
78 pd.DataFrame
79 Stripe Products matching the query
81 Raises
82 ------
83 ValueError
84 If the query contains an unsupported condition
85 """
87 select_statement_parser = SELECTQueryParser(
88 query,
89 'products',
90 self.get_columns()
91 )
92 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
94 products_df = pd.json_normalize(self.get_products(limit=result_limit))
95 select_statement_executor = SELECTQueryExecutor(
96 products_df,
97 selected_columns,
98 where_conditions,
99 order_by_conditions
100 )
101 products_df = select_statement_executor.execute_query()
103 return products_df
105 def insert(self, query: ast.Insert) -> None:
106 """
107 Inserts data into Stripe "POST v1/products" API endpoint.
109 Parameters
110 ----------
111 query : ast.Insert
112 Given SQL INSERT query
114 Returns
115 -------
116 None
118 Raises
119 ------
120 ValueError
121 If the query contains an unsupported condition
122 """
123 insert_statement_parser = INSERTQueryParser(
124 query,
125 supported_columns=['id', 'name', 'active', 'description', 'metadata'],
126 mandatory_columns=['name'],
127 all_mandatory=False,
128 )
129 product_data = insert_statement_parser.parse_query()
130 self.create_products(product_data)
132 def update(self, query: ast.Update) -> None:
133 """
134 Updates data from Stripe "POST v1/products/:id" API endpoint.
136 Parameters
137 ----------
138 query : ast.Update
139 Given SQL UPDATE query
141 Returns
142 -------
143 None
145 Raises
146 ------
147 ValueError
148 If the query contains an unsupported condition
149 """
150 update_statement_parser = UPDATEQueryParser(query)
151 values_to_update, where_conditions = update_statement_parser.parse_query()
153 products_df = pd.json_normalize(self.get_products())
154 update_query_executor = UPDATEQueryExecutor(
155 products_df,
156 where_conditions
157 )
159 products_df = update_query_executor.execute_query()
160 product_ids = products_df['id'].tolist()
161 self.update_products(product_ids, values_to_update)
163 def delete(self, query: ast.Delete) -> None:
164 """
165 Deletes data from Stripe "DELETE v1/products/:id" API endpoint.
167 Parameters
168 ----------
169 query : ast.Delete
170 Given SQL DELETE query
172 Returns
173 -------
174 None
176 Raises
177 ------
178 ValueError
179 If the query contains an unsupported condition
180 """
181 delete_statement_parser = DELETEQueryParser(query)
182 where_conditions = delete_statement_parser.parse_query()
184 products_df = pd.json_normalize(self.get_products())
185 delete_query_executor = DELETEQueryExecutor(
186 products_df,
187 where_conditions
188 )
190 products_df = delete_query_executor.execute_query()
191 product_ids = products_df['id'].tolist()
192 self.delete_products(product_ids)
194 def get_columns(self) -> List[Text]:
195 return pd.json_normalize(self.get_products(limit=1)).columns.tolist()
197 def get_products(self, **kwargs) -> List[Dict]:
198 stripe = self.handler.connect()
199 products = stripe.Product.list(**kwargs)
200 return [product.to_dict() for product in products]
202 def create_products(self, product_data: List[Dict[Text, Any]]) -> None:
203 stripe = self.handler.connect()
204 for product in product_data:
205 created_product = stripe.Product.create(**product)
206 if 'id' not in created_product.to_dict():
207 raise Exception('Product creation failed')
208 else:
209 logger.info(f'Product {created_product.to_dict()["id"]} created')
211 def update_products(self, product_ids: List[Text], values_to_update: Dict[Text, Any]) -> None:
212 stripe = self.handler.connect()
213 for product_id in product_ids:
214 updated_product = stripe.Product.modify(product_id, **values_to_update)
215 if 'id' not in updated_product.to_dict():
216 raise Exception('Product update failed')
217 else:
218 logger.info(f'Product {updated_product.to_dict()["id"]} updated')
220 def delete_products(self, product_ids: List[Text]) -> None:
221 stripe = self.handler.connect()
222 for product_id in product_ids:
223 deleted_product = stripe.Product.delete(product_id)
224 if 'id' not in deleted_product.to_dict():
225 raise Exception('Product deletion failed')
226 else:
227 logger.info(f'Product {deleted_product.to_dict()["id"]} deleted')
230class PaymentIntentsTable(APITable):
231 """The Stripe Payment Intents Table implementation"""
233 def select(self, query: ast.Select) -> pd.DataFrame:
234 """
235 Pulls Stripe Payment Intents data.
237 Parameters
238 ----------
239 query : ast.Select
240 Given SQL SELECT query
242 Returns
243 -------
244 pd.DataFrame
245 Stripe Payment Intents matching the query
247 Raises
248 ------
249 ValueError
250 If the query contains an unsupported condition
251 """
253 select_statement_parser = SELECTQueryParser(
254 query,
255 'payment_intents',
256 self.get_columns()
257 )
258 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
260 payment_intents_df = pd.json_normalize(self.get_payment_intents(limit=result_limit))
261 select_statement_executor = SELECTQueryExecutor(
262 payment_intents_df,
263 selected_columns,
264 where_conditions,
265 order_by_conditions
266 )
267 payment_intents_df = select_statement_executor.execute_query()
269 return payment_intents_df
271 def delete(self, query: ast.Delete) -> None:
272 """
273 Cancels Stripe Payment Intents and updates the local data.
275 Parameters
276 ----------
277 query : ast.Delete
278 Given SQL DELETE query
280 Returns
281 -------
282 None
284 Raises
285 ------
286 ValueError
287 If the query contains an unsupported condition
288 """
289 delete_statement_parser = DELETEQueryParser(query)
290 where_conditions = delete_statement_parser.parse_query()
292 if 'payment_intents_df' not in self.__dict__:
293 self.payment_intents_df = pd.json_normalize(self.get_payment_intents())
295 delete_query_executor = DELETEQueryExecutor(
296 self.payment_intents_df,
297 where_conditions
298 )
300 canceled_payment_intents_df = delete_query_executor.execute_query()
302 payment_intent_ids = canceled_payment_intents_df['id'].tolist()
303 self.cancel_payment_intents(payment_intent_ids)
305 self.payment_intents_df = self.payment_intents_df[~self.payment_intents_df['id'].isin(payment_intent_ids)]
307 def cancel_payment_intents(self, payment_intent_ids: List[str]) -> None:
308 stripe = self.handler.connect()
309 for payment_intent_id in payment_intent_ids:
310 try:
312 payment_intent = stripe.PaymentIntent.retrieve(payment_intent_id)
313 if payment_intent.status in ['requires_payment_method', 'requires_capture', 'requires_confirmation', 'requires_action', 'processing']:
314 stripe.PaymentIntent.cancel(payment_intent_id)
315 else:
316 logger.warning(f"Payment intent {payment_intent_id} is in status {payment_intent.status} and cannot be canceled.")
317 except stripe.error.StripeError as e:
318 logger.error(f"Error cancelling payment intent {payment_intent_id}: {str(e)}")
320 def update(self, query: 'ast.Update') -> None:
321 """
322 Updates data in Stripe "POST /v1/payment_intents/:id" API endpoint.
324 Parameters
325 ----------
326 query : ast.Update
327 Given SQL UPDATE query
329 Returns
330 -------
331 None
333 Raises
334 ------
335 ValueError
336 If the query contains an unsupported condition
337 """
338 update_statement_parser = UPDATEQueryParser(query)
339 values_to_update, where_conditions = update_statement_parser.parse_query()
341 payment_intents_df = pd.json_normalize(self.get_payment_intents())
342 update_query_executor = UPDATEQueryExecutor(
343 payment_intents_df,
344 where_conditions
345 )
347 payment_intents_df = update_query_executor.execute_query()
348 payment_intent_ids = payment_intents_df['id'].tolist()
349 self.update_payment_intents(payment_intent_ids, values_to_update)
351 def update_payment_intents(self, payment_intent_ids: list, values_to_update: dict) -> None:
352 for payment_intent_id in payment_intent_ids:
353 stripe.PaymentIntent.modify(payment_intent_id, **values_to_update)
355 def insert(self, query: 'ast.Insert') -> None:
356 """
357 Inserts data into Stripe "POST /v1/payment_intents" API endpoint.
359 Parameters
360 ----------
361 query : ast.Insert
362 Given SQL INSERT query
364 Returns
365 -------
366 None
368 Raises
369 ------
370 ValueError
371 If the query contains an unsupported condition
372 """
373 insert_statement_parser = INSERTQueryParser(
374 query,
375 supported_columns=['amount', 'currency', 'description', 'payment_method_types'],
376 mandatory_columns=['amount', 'currency'],
377 all_mandatory=True
378 )
379 payment_intent_data = insert_statement_parser.parse_query()
380 self.create_payment_intent(payment_intent_data)
382 def create_payment_intent(self, payment_intent_data: list) -> None:
383 for data in payment_intent_data:
384 stripe.PaymentIntent.create(**data)
386 def get_columns(self) -> List[Text]:
387 return pd.json_normalize(self.get_payment_intents(limit=1)).columns.tolist()
389 def get_payment_intents(self, **kwargs) -> List[Dict]:
390 stripe = self.handler.connect()
391 payment_intents = stripe.PaymentIntent.list(**kwargs)
392 return [payment_intent.to_dict() for payment_intent in payment_intents]
395class RefundsTable(APITable):
396 """The Stripe Refund Table implementation"""
398 def select(self, query: ast.Select) -> pd.DataFrame:
399 """
400 Pulls Stripe Refund data.
402 Parameters
403 ----------
404 query : ast.Select
405 Given SQL SELECT query
407 Returns
408 -------
409 pd.DataFrame
410 Stripe Refunds matching the query
412 Raises
413 ------
414 ValueError
415 If the query contains an unsupported condition
416 """
418 select_statement_parser = SELECTQueryParser(
419 query,
420 'refunds',
421 self.get_columns()
422 )
423 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
425 refunds_df = pd.json_normalize(self.get_refunds(limit=result_limit))
426 select_statement_executor = SELECTQueryExecutor(
427 refunds_df,
428 selected_columns,
429 where_conditions,
430 order_by_conditions
431 )
432 refunds_df = select_statement_executor.execute_query()
434 return refunds_df
436 def get_columns(self) -> List[Text]:
437 return pd.json_normalize(self.get_refunds(limit=1)).columns.tolist()
439 def get_refunds(self, **kwargs) -> List[Dict]:
440 stripe = self.handler.connect()
441 refunds = stripe.Refund.list(**kwargs)
442 return [refund.to_dict() for refund in refunds]
445class PayoutsTable(APITable):
446 """The Stripe Payouts Table implementation"""
448 def select(self, query: ast.Select) -> pd.DataFrame:
449 """
450 Pulls Stripe Payout data.
452 Parameters
453 ----------
454 query : ast.Select
455 Given SQL SELECT query
457 Returns
458 -------
459 pd.DataFrame
460 Stripe Payouts matching the query
462 Raises
463 ------
464 ValueError
465 If the query contains an unsupported condition
466 """
468 select_statement_parser = SELECTQueryParser(
469 query,
470 'payouts',
471 self.get_columns()
472 )
473 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
475 payouts_df = pd.json_normalize(self.get_payouts(limit=result_limit))
476 select_statement_executor = SELECTQueryExecutor(
477 payouts_df,
478 selected_columns,
479 where_conditions,
480 order_by_conditions
481 )
482 payouts_df = select_statement_executor.execute_query()
484 return payouts_df
486 def get_columns(self) -> List[Text]:
487 return pd.json_normalize(self.get_payouts(limit=1)).columns.tolist()
489 def get_payouts(self, **kwargs) -> List[Dict]:
490 stripe = self.handler.connect()
491 payouts = stripe.Payout.list(**kwargs)
492 return [payout.to_dict() for payout in payouts]