Coverage for mindsdb / integrations / handlers / shopify_handler / shopify_handler.py: 0%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import requests 

2import shopify 

3 

4from mindsdb.integrations.handlers.shopify_handler.shopify_tables import ( 

5 ProductsTable, 

6 ProductVariantsTable, 

7 CustomersTable, 

8 OrdersTable, 

9 MarketingEventsTable, 

10 InventoryItemsTable, 

11 StaffMembersTable, 

12 GiftCardsTable, 

13) 

14from mindsdb.integrations.libs.api_handler import MetaAPIHandler 

15from mindsdb.integrations.libs.response import ( 

16 HandlerStatusResponse as StatusResponse, 

17) 

18 

19from mindsdb.utilities import log 

20from mindsdb_sql_parser import parse_sql 

21from mindsdb.integrations.libs.api_handler_exceptions import ( 

22 InvalidNativeQuery, 

23 ConnectionFailed, 

24 MissingConnectionParams, 

25) 

26 

27from .connection_args import connection_args 

28 

29logger = log.getLogger(__name__) 

30 

31 

32class ShopifyHandler(MetaAPIHandler): 

33 """ 

34 The Shopify handler implementation. 

35 """ 

36 

37 name = "shopify" 

38 

39 def __init__(self, name: str, **kwargs): 

40 """ 

41 Initialize the handler. 

42 Args: 

43 name (str): name of particular handler instance 

44 **kwargs: arbitrary keyword arguments. 

45 """ 

46 super().__init__(name) 

47 

48 if kwargs.get("connection_data") is None: 

49 raise MissingConnectionParams("Incomplete parameters passed to Shopify Handler") 

50 

51 connection_data = kwargs.get("connection_data", {}) 

52 

53 required_args = [arg_name for arg_name, arg_meta in connection_args.items() if arg_meta.get("required") is True] 

54 missed_args = set(required_args) - set(connection_data) 

55 if missed_args: 

56 raise MissingConnectionParams( 

57 f"Required parameters are not found in the connection data: {', '.join(list(missed_args))}" 

58 ) 

59 

60 self.connection_data = connection_data 

61 self.kwargs = kwargs 

62 

63 self.connection = None 

64 self.is_connected = False 

65 

66 self._register_table("products", ProductsTable(self)) 

67 self._register_table("customers", CustomersTable(self)) 

68 self._register_table("orders", OrdersTable(self)) 

69 self._register_table("product_variants", ProductVariantsTable(self)) 

70 self._register_table("marketing_events", MarketingEventsTable(self)) 

71 self._register_table("inventory_items", InventoryItemsTable(self)) 

72 self._register_table("staff_members", StaffMembersTable(self)) 

73 self._register_table("gift_cards", GiftCardsTable(self)) 

74 

75 def connect(self): 

76 """ 

77 Set up the connection required by the handler. 

78 Returns 

79 ------- 

80 StatusResponse 

81 connection object 

82 """ 

83 if self.is_connected is True: 

84 return self.connection 

85 

86 shop_url = self.connection_data["shop_url"] 

87 client_id = self.connection_data["client_id"] 

88 client_secret = self.connection_data["client_secret"] 

89 

90 response = requests.post( 

91 f"https://{shop_url}/admin/oauth/access_token", 

92 data={"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret}, 

93 headers={"Content-Type": "application/x-www-form-urlencoded"}, 

94 timeout=10, 

95 ) 

96 response.raise_for_status() 

97 result = response.json() 

98 access_token = result.get("access_token") 

99 if not access_token: 

100 raise ConnectionFailed("Unable to get an access token") 

101 

102 api_session = shopify.Session(shop_url, "2025-10", access_token) 

103 

104 self.connection = api_session 

105 self.is_connected = True 

106 

107 return self.connection 

108 

109 def check_connection(self) -> StatusResponse: 

110 """ 

111 Check connection to the handler. 

112 Returns: 

113 HandlerStatusResponse 

114 """ 

115 

116 response = StatusResponse(False) 

117 

118 try: 

119 api_session = self.connect() 

120 shopify.ShopifyResource.activate_session(api_session) 

121 shopify.Shop.current() 

122 response.success = True 

123 except Exception as e: 

124 logger.error("Error connecting to Shopify!") 

125 response.error_message = str(e) 

126 

127 self.is_connected = response.success 

128 

129 return response 

130 

131 def native_query(self, query: str) -> StatusResponse: 

132 """Receive and process a raw query. 

133 Parameters 

134 ---------- 

135 query : str 

136 query in a native format 

137 Returns 

138 ------- 

139 StatusResponse 

140 Request status 

141 """ 

142 try: 

143 ast = parse_sql(query) 

144 except Exception: 

145 raise InvalidNativeQuery(f"The query {query} is invalid.") 

146 return self.query(ast)