Coverage for mindsdb / integrations / handlers / strapi_handler / strapi_handler.py: 0%
75 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb.integrations.handlers.strapi_handler.strapi_tables import StrapiTable
2from mindsdb.integrations.libs.api_handler import APIHandler
3from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse
4from mindsdb_sql_parser import parse_sql
5from mindsdb.utilities import log
6import requests
7from mindsdb.integrations.libs.const import HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE
8from collections import OrderedDict
9import pandas as pd
11logger = log.getLogger(__name__)
14class StrapiHandler(APIHandler):
15 def __init__(self, name: str, **kwargs) -> None:
16 """initializer method
18 Args:
19 name (str): handler name
20 """
21 super().__init__(name)
23 self.connection = None
24 self.is_connected = False
25 args = kwargs.get('connection_data', {})
26 if 'host' in args and 'port' in args:
27 self._base_url = f"http://{args['host']}:{args['port']}"
28 if 'api_token' in args:
29 self._api_token = args['api_token']
30 if 'plural_api_ids' in args:
31 self._plural_api_ids = args['plural_api_ids']
32 # Registers tables for each collections in strapi
33 for pluralApiId in self._plural_api_ids:
34 self._register_table(table_name=pluralApiId, table_class=StrapiTable(handler=self, name=pluralApiId))
36 def check_connection(self) -> StatusResponse:
37 """checking the connection
39 Returns:
40 StatusResponse: whether the connection is still up
41 """
42 response = StatusResponse(False)
43 try:
44 self.connect()
45 response.success = True
46 except Exception as e:
47 logger.error(f'Error connecting to Strapi API: {e}!')
48 response.error_message = e
50 self.is_connected = response.success
51 return response
53 def connect(self) -> StatusResponse:
54 """making the connectino object
55 """
56 if self.is_connected and self.connection:
57 return self.connection
59 try:
60 headers = {"Authorization": f"Bearer {self._api_token}"}
61 response = requests.get(f"{self._base_url}", headers=headers)
62 if response.status_code == 200:
63 self.connection = response
64 self.is_connected = True
65 return StatusResponse(True)
66 else:
67 raise Exception(f"Error connecting to Strapi API: {response.status_code} - {response.text}")
68 except Exception as e:
69 logger.error(f'Error connecting to Strapi API: {e}!')
70 return StatusResponse(False, error_message=e)
72 def native_query(self, query: str) -> StatusResponse:
73 """Receive and process a raw query.
75 Parameters
76 ----------
77 query : str
78 query in a native format
80 Returns
81 -------
82 StatusResponse
83 Request status
84 """
85 ast = parse_sql(query)
86 return self.query(ast)
88 def call_strapi_api(self, method: str, endpoint: str, params: dict = {}, json_data: dict = {}) -> pd.DataFrame:
89 headers = {"Authorization": f"Bearer {self._api_token}"}
90 url = f"{self._base_url}{endpoint}"
92 if method.upper() in ('GET', 'POST', 'PUT', 'DELETE'):
93 headers['Content-Type'] = 'application/json'
95 if method.upper() in ('POST', 'PUT', 'DELETE'):
96 response = requests.request(method, url, headers=headers, params=params, data=json_data)
97 else:
98 response = requests.get(url, headers=headers, params=params)
100 if response.status_code == 200:
101 data = response.json()
102 # Create an empty DataFrame
103 df = pd.DataFrame()
104 if isinstance(data.get('data', None), list):
105 for item in data['data']:
106 # Add 'id' and 'attributes' to the DataFrame
107 row_data = {'id': item['id'], **item['attributes']}
108 df = df._append(row_data, ignore_index=True)
109 return df
110 elif isinstance(data.get('data', None), dict):
111 # Add 'id' and 'attributes' to the DataFrame
112 row_data = {'id': data['data']['id'], **data['data']['attributes']}
113 df = df._append(row_data, ignore_index=True)
114 return df
115 else:
116 raise Exception(f"Error connecting to Strapi API: {response.status_code} - {response.text}")
118 return pd.DataFrame()
121connection_args = OrderedDict(
122 api_token={
123 "type": ARG_TYPE.PWD,
124 "description": "Strapi API key to use for authentication.",
125 "required": True,
126 "label": "Api token",
127 },
128 host={
129 "type": ARG_TYPE.URL,
130 "description": "Strapi API host to connect to.",
131 "required": True,
132 "label": "Host",
133 },
134 port={
135 "type": ARG_TYPE.INT,
136 "description": "Strapi API port to connect to.",
137 "required": True,
138 "label": "Port",
139 },
140 plural_api_ids={
141 "type": list,
142 "description": "Plural API id to use for querying.",
143 "required": True,
144 "label": "Plural API id",
145 },
146)
148connection_args_example = OrderedDict(
149 host="localhost",
150 port=1337,
151 api_token="c56c000d867e95848c",
152 plural_api_ids=["posts", "portfolios"],
153)