Coverage for mindsdb / integrations / handlers / shopify_handler / utils.py: 0%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2import inspect 

3from enum import Enum 

4from dataclasses import dataclass 

5 

6import shopify 

7 

8from mindsdb.utilities import log 

9 

10from .models.utils import Nodes, Extract 

11from .models.common import AliasesEnum 

12 

13logger = log.getLogger(__name__) 

14 

15MAX_PAGE_LIMIT = 250 

16PAGE_INFO = "pageInfo { hasNextPage endCursor }" 

17 

18 

19def _format_error(errors_list: list[dict]) -> str: 

20 """Format shopify's GraphQL error list into a single string. 

21 

22 Args: 

23 errors_list: The list of errors. 

24 

25 Returns: 

26 str: The formatted error string. 

27 """ 

28 errors_text = [record.get("message", "undescribed") for record in errors_list] 

29 if len(errors_list) == 0: 

30 errors_text = errors_text[0] 

31 return f"An error occurred when executing the query: {errors_text}" 

32 errors_text = "\n".join(errors_text) 

33 return f"Error occurred when executing the query:\n{errors_text}" 

34 

35 

36def get_graphql_columns(root: AliasesEnum, targets: list[str] | None = None) -> str: 

37 """Get the GraphQL columns for a given object. 

38 

39 Args: 

40 root: The object to get the GraphQL columns for. 

41 targets: The list of columns to include in the query. 

42 

43 Returns: 

44 str: The GraphQL columns string. 

45 """ 

46 acc = [] 

47 if targets: 

48 targets = [name.lower() for name in targets] 

49 for name, value in root.aliases(): 

50 if targets and name.lower() not in targets: 

51 continue 

52 if isinstance(value, Nodes): 

53 sub_fields = get_graphql_columns(value.enum) 

54 acc.append(f"{name}(first: {MAX_PAGE_LIMIT}) {{ nodes {{{sub_fields}}} {PAGE_INFO} }}") 

55 elif isinstance(value, Extract): 

56 acc.append(f"{name}:{value.obj} {{ {value.key} }}") 

57 elif inspect.isclass(value) and issubclass(value, Enum): 

58 sub_fields = get_graphql_columns(value) 

59 acc.append(f"{name} {{{sub_fields}}}") 

60 else: 

61 acc.append(value) 

62 return " ".join(acc) 

63 

64 

65@dataclass(slots=True, kw_only=True) 

66class ShopifyQuery: 

67 """A class to represent a Shopify GraphQL query. 

68 

69 Args: 

70 operation_name: The name of the operation to execute. 

71 columns: The columns to include in the query. 

72 limit: The limit of the query. 

73 cursor: The cursor to use for pagination. 

74 sort_key: The key to use for sorting. 

75 reverse: Whether to reverse the sort. 

76 query: The query to execute. 

77 """ 

78 

79 operation_name: str 

80 columns: str 

81 limit: int | None = None 

82 cursor: str | None = None 

83 sort_key: str | None = None 

84 reverse: bool = False 

85 query: str | None = None 

86 

87 def to_string(self) -> str: 

88 """Convert the query to a string. 

89 

90 Returns: 

91 str: The string representation of the query. 

92 """ 

93 items = [f"first: {self.limit or MAX_PAGE_LIMIT}"] 

94 if self.cursor: 

95 items.append(f'after: "{self.cursor}"') 

96 if self.sort_key: 

97 items.append(f"sortKey: {self.sort_key}, reverse: {'true' if self.reverse else 'false'}") 

98 if self.query: 

99 items.append(f'query: "{self.query}"') 

100 return f"{{ {self.operation_name} ({', '.join(items)}) {{ nodes {{ {self.columns} }} {PAGE_INFO} }} }}" 

101 

102 def execute(self) -> list[dict]: 

103 """Execute the query. 

104 

105 Returns: 

106 list[dict]: The result of the query. 

107 """ 

108 result = shopify.GraphQL().execute(self.to_string()) 

109 return json.loads(result) 

110 

111 

112def query_graphql_nodes( 

113 root_name: str, 

114 root_class: type, 

115 columns: str, 

116 cursor: str | None = None, 

117 limit: int | None = None, 

118 sort_key: str | None = None, 

119 sort_reverse: bool = False, 

120 query: str | None = None, 

121 depth: int = 1, 

122): 

123 """Query the GraphQL API for nodes. 

124 

125 Args: 

126 root_name: The name of the root object. 

127 root_class: The root object. 

128 columns: The columns to include in the query. 

129 cursor: The cursor to use for pagination. 

130 limit: The limit of the query. 

131 sort_key: The key to use for sorting. 

132 sort_reverse: Whether to reverse the sort. 

133 query: The query to execute. 

134 depth: The depth of the nodes to fetch. Default is 1: fetch the first level of nested nodes. 

135 

136 Returns: 

137 list[dict]: The list of nodes. 

138 """ 

139 result_data = [] 

140 hasNextPage = True 

141 while hasNextPage: 

142 result = ShopifyQuery( 

143 operation_name=root_name, 

144 columns=columns, 

145 limit=max(MAX_PAGE_LIMIT if limit is None else limit - len(result_data), 0), 

146 cursor=cursor, 

147 sort_key=sort_key, 

148 reverse=sort_reverse, 

149 query=query, 

150 ).execute() 

151 if "errors" in result: 

152 raise Exception(_format_error(result["errors"])) 

153 hasNextPage = result["data"][root_name]["pageInfo"]["hasNextPage"] 

154 cursor = result["data"][root_name]["pageInfo"]["endCursor"] 

155 result_data += result["data"][root_name]["nodes"] 

156 

157 fetched_fields = [] 

158 if len(result_data) > 0: 

159 fetched_fields = [name.lower() for name in result_data[0].keys()] 

160 

161 nodes_name = [ 

162 name for name, value in root_class.aliases() if isinstance(value, Nodes) if name.lower() in fetched_fields 

163 ] 

164 extracts_names = [ 

165 name for name, value in root_class.aliases() if isinstance(value, Extract) if name.lower() in fetched_fields 

166 ] 

167 

168 for row in result_data: 

169 for name in nodes_name: 

170 value = root_class[name].value 

171 node_data = row[name]["nodes"] 

172 hasNextPage = row[name]["pageInfo"]["hasNextPage"] 

173 if depth > 0 and hasNextPage: 

174 cursor = row[name]["pageInfo"]["endCursor"] 

175 result = query_graphql_nodes( 

176 root_name=name, 

177 cursor=cursor, 

178 root_class=value.enum, 

179 columns=get_graphql_columns(value.enum), 

180 depth=depth - 1, 

181 ) 

182 node_data += result 

183 row[name] = node_data 

184 for name in extracts_names: 

185 value = root_class[name].value 

186 row[name] = (row[name] or {}).get(value.key) 

187 

188 if limit: 

189 result_data = result_data[:limit] 

190 

191 return result_data