Coverage for mindsdb / integrations / handlers / shopify_handler / utils.py: 0%
87 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import inspect
3from enum import Enum
4from dataclasses import dataclass
6import shopify
8from mindsdb.utilities import log
10from .models.utils import Nodes, Extract
11from .models.common import AliasesEnum
13logger = log.getLogger(__name__)
15MAX_PAGE_LIMIT = 250
16PAGE_INFO = "pageInfo { hasNextPage endCursor }"
19def _format_error(errors_list: list[dict]) -> str:
20 """Format shopify's GraphQL error list into a single string.
22 Args:
23 errors_list: The list of errors.
25 Returns:
26 str: The formatted error string.
27 """
28 errors_text = [record.get("message", "undescribed") for record in errors_list]
29 if len(errors_list) == 0:
30 errors_text = errors_text[0]
31 return f"An error occurred when executing the query: {errors_text}"
32 errors_text = "\n".join(errors_text)
33 return f"Error occurred when executing the query:\n{errors_text}"
36def get_graphql_columns(root: AliasesEnum, targets: list[str] | None = None) -> str:
37 """Get the GraphQL columns for a given object.
39 Args:
40 root: The object to get the GraphQL columns for.
41 targets: The list of columns to include in the query.
43 Returns:
44 str: The GraphQL columns string.
45 """
46 acc = []
47 if targets:
48 targets = [name.lower() for name in targets]
49 for name, value in root.aliases():
50 if targets and name.lower() not in targets:
51 continue
52 if isinstance(value, Nodes):
53 sub_fields = get_graphql_columns(value.enum)
54 acc.append(f"{name}(first: {MAX_PAGE_LIMIT}) {{ nodes {{{sub_fields}}} {PAGE_INFO} }}")
55 elif isinstance(value, Extract):
56 acc.append(f"{name}:{value.obj} {{ {value.key} }}")
57 elif inspect.isclass(value) and issubclass(value, Enum):
58 sub_fields = get_graphql_columns(value)
59 acc.append(f"{name} {{{sub_fields}}}")
60 else:
61 acc.append(value)
62 return " ".join(acc)
65@dataclass(slots=True, kw_only=True)
66class ShopifyQuery:
67 """A class to represent a Shopify GraphQL query.
69 Args:
70 operation_name: The name of the operation to execute.
71 columns: The columns to include in the query.
72 limit: The limit of the query.
73 cursor: The cursor to use for pagination.
74 sort_key: The key to use for sorting.
75 reverse: Whether to reverse the sort.
76 query: The query to execute.
77 """
79 operation_name: str
80 columns: str
81 limit: int | None = None
82 cursor: str | None = None
83 sort_key: str | None = None
84 reverse: bool = False
85 query: str | None = None
87 def to_string(self) -> str:
88 """Convert the query to a string.
90 Returns:
91 str: The string representation of the query.
92 """
93 items = [f"first: {self.limit or MAX_PAGE_LIMIT}"]
94 if self.cursor:
95 items.append(f'after: "{self.cursor}"')
96 if self.sort_key:
97 items.append(f"sortKey: {self.sort_key}, reverse: {'true' if self.reverse else 'false'}")
98 if self.query:
99 items.append(f'query: "{self.query}"')
100 return f"{{ {self.operation_name} ({', '.join(items)}) {{ nodes {{ {self.columns} }} {PAGE_INFO} }} }}"
102 def execute(self) -> list[dict]:
103 """Execute the query.
105 Returns:
106 list[dict]: The result of the query.
107 """
108 result = shopify.GraphQL().execute(self.to_string())
109 return json.loads(result)
112def query_graphql_nodes(
113 root_name: str,
114 root_class: type,
115 columns: str,
116 cursor: str | None = None,
117 limit: int | None = None,
118 sort_key: str | None = None,
119 sort_reverse: bool = False,
120 query: str | None = None,
121 depth: int = 1,
122):
123 """Query the GraphQL API for nodes.
125 Args:
126 root_name: The name of the root object.
127 root_class: The root object.
128 columns: The columns to include in the query.
129 cursor: The cursor to use for pagination.
130 limit: The limit of the query.
131 sort_key: The key to use for sorting.
132 sort_reverse: Whether to reverse the sort.
133 query: The query to execute.
134 depth: The depth of the nodes to fetch. Default is 1: fetch the first level of nested nodes.
136 Returns:
137 list[dict]: The list of nodes.
138 """
139 result_data = []
140 hasNextPage = True
141 while hasNextPage:
142 result = ShopifyQuery(
143 operation_name=root_name,
144 columns=columns,
145 limit=max(MAX_PAGE_LIMIT if limit is None else limit - len(result_data), 0),
146 cursor=cursor,
147 sort_key=sort_key,
148 reverse=sort_reverse,
149 query=query,
150 ).execute()
151 if "errors" in result:
152 raise Exception(_format_error(result["errors"]))
153 hasNextPage = result["data"][root_name]["pageInfo"]["hasNextPage"]
154 cursor = result["data"][root_name]["pageInfo"]["endCursor"]
155 result_data += result["data"][root_name]["nodes"]
157 fetched_fields = []
158 if len(result_data) > 0:
159 fetched_fields = [name.lower() for name in result_data[0].keys()]
161 nodes_name = [
162 name for name, value in root_class.aliases() if isinstance(value, Nodes) if name.lower() in fetched_fields
163 ]
164 extracts_names = [
165 name for name, value in root_class.aliases() if isinstance(value, Extract) if name.lower() in fetched_fields
166 ]
168 for row in result_data:
169 for name in nodes_name:
170 value = root_class[name].value
171 node_data = row[name]["nodes"]
172 hasNextPage = row[name]["pageInfo"]["hasNextPage"]
173 if depth > 0 and hasNextPage:
174 cursor = row[name]["pageInfo"]["endCursor"]
175 result = query_graphql_nodes(
176 root_name=name,
177 cursor=cursor,
178 root_class=value.enum,
179 columns=get_graphql_columns(value.enum),
180 depth=depth - 1,
181 )
182 node_data += result
183 row[name] = node_data
184 for name in extracts_names:
185 value = root_class[name].value
186 row[name] = (row[name] or {}).get(value.key)
188 if limit:
189 result_data = result_data[:limit]
191 return result_data