Coverage for mindsdb / integrations / handlers / sap_erp_handler / api.py: 0%
40 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import requests
2from urllib.parse import urljoin
4from mindsdb.utilities import log
6logger = log.getLogger(__name__)
9def move_under(d, key_contents_to_move, key_to_move_under=None):
10 """
11 Moves the keys in nested dict with key key_contents_to_move under the dict defined by key_to_move_under.
12 If no key_to_move_under is provided, the keys are moved under d.
13 eg.
14 Calling this on the following dict (d) with key_contents_to_move = "a":
15 {
16 "a": {
17 "b": 1,
18 "c": 2
19 }
20 }
21 results in:
22 {
23 "b": 1,
24 "c": 2
25 }
26 """
27 if key_contents_to_move not in d:
28 return
29 for k, v in d[key_contents_to_move].items():
30 if key_to_move_under:
31 d[key_to_move_under][k] = v
32 else:
33 d[k] = v
34 del d[key_contents_to_move]
37class SAPERP:
39 def __init__(self, url: str, api_key: str) -> None:
40 self.base_url = url
41 self.api_key = api_key
43 def _request(self, method: str, relative_endpoint: str, data=None):
44 kwargs = {
45 "method": method,
46 "url": urljoin(self.base_url, relative_endpoint),
47 "headers": {
48 "APIKey": self.api_key,
49 "Accept": "application/json",
50 "DataServiceVersion": "2.0"
51 }
52 }
53 if data is not None:
54 kwargs["data"] = data
55 return requests.request(**kwargs)
57 def is_connected(self) -> bool:
58 if self._request("get", "").ok:
59 return True
60 return False
62 def get(self, endpoint):
63 """ Common method for all get endpoints """
64 try:
65 resp = self._request("get", endpoint)
66 if resp.ok:
67 resp = resp.json()["d"]
68 if "results" in resp:
69 resp = resp["results"]
70 else:
71 resp = [resp]
72 else:
73 resp = []
74 for r in resp:
75 move_under(r, "__metadata")
76 return resp
77 except Exception as e:
78 logger.error(f"Error requesting endpoint {endpoint}: {e}")
79 return {}