Coverage for mindsdb / integrations / handlers / strapi_handler / strapi_handler.py: 0%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb.integrations.handlers.strapi_handler.strapi_tables import StrapiTable 

2from mindsdb.integrations.libs.api_handler import APIHandler 

3from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse 

4from mindsdb_sql_parser import parse_sql 

5from mindsdb.utilities import log 

6import requests 

7from mindsdb.integrations.libs.const import HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE 

8from collections import OrderedDict 

9import pandas as pd 

10 

11logger = log.getLogger(__name__) 

12 

13 

14class StrapiHandler(APIHandler): 

15 def __init__(self, name: str, **kwargs) -> None: 

16 """initializer method 

17 

18 Args: 

19 name (str): handler name 

20 """ 

21 super().__init__(name) 

22 

23 self.connection = None 

24 self.is_connected = False 

25 args = kwargs.get('connection_data', {}) 

26 if 'host' in args and 'port' in args: 

27 self._base_url = f"http://{args['host']}:{args['port']}" 

28 if 'api_token' in args: 

29 self._api_token = args['api_token'] 

30 if 'plural_api_ids' in args: 

31 self._plural_api_ids = args['plural_api_ids'] 

32 # Registers tables for each collections in strapi 

33 for pluralApiId in self._plural_api_ids: 

34 self._register_table(table_name=pluralApiId, table_class=StrapiTable(handler=self, name=pluralApiId)) 

35 

36 def check_connection(self) -> StatusResponse: 

37 """checking the connection 

38 

39 Returns: 

40 StatusResponse: whether the connection is still up 

41 """ 

42 response = StatusResponse(False) 

43 try: 

44 self.connect() 

45 response.success = True 

46 except Exception as e: 

47 logger.error(f'Error connecting to Strapi API: {e}!') 

48 response.error_message = e 

49 

50 self.is_connected = response.success 

51 return response 

52 

53 def connect(self) -> StatusResponse: 

54 """making the connectino object 

55 """ 

56 if self.is_connected and self.connection: 

57 return self.connection 

58 

59 try: 

60 headers = {"Authorization": f"Bearer {self._api_token}"} 

61 response = requests.get(f"{self._base_url}", headers=headers) 

62 if response.status_code == 200: 

63 self.connection = response 

64 self.is_connected = True 

65 return StatusResponse(True) 

66 else: 

67 raise Exception(f"Error connecting to Strapi API: {response.status_code} - {response.text}") 

68 except Exception as e: 

69 logger.error(f'Error connecting to Strapi API: {e}!') 

70 return StatusResponse(False, error_message=e) 

71 

72 def native_query(self, query: str) -> StatusResponse: 

73 """Receive and process a raw query. 

74 

75 Parameters 

76 ---------- 

77 query : str 

78 query in a native format 

79 

80 Returns 

81 ------- 

82 StatusResponse 

83 Request status 

84 """ 

85 ast = parse_sql(query) 

86 return self.query(ast) 

87 

88 def call_strapi_api(self, method: str, endpoint: str, params: dict = {}, json_data: dict = {}) -> pd.DataFrame: 

89 headers = {"Authorization": f"Bearer {self._api_token}"} 

90 url = f"{self._base_url}{endpoint}" 

91 

92 if method.upper() in ('GET', 'POST', 'PUT', 'DELETE'): 

93 headers['Content-Type'] = 'application/json' 

94 

95 if method.upper() in ('POST', 'PUT', 'DELETE'): 

96 response = requests.request(method, url, headers=headers, params=params, data=json_data) 

97 else: 

98 response = requests.get(url, headers=headers, params=params) 

99 

100 if response.status_code == 200: 

101 data = response.json() 

102 # Create an empty DataFrame 

103 df = pd.DataFrame() 

104 if isinstance(data.get('data', None), list): 

105 for item in data['data']: 

106 # Add 'id' and 'attributes' to the DataFrame 

107 row_data = {'id': item['id'], **item['attributes']} 

108 df = df._append(row_data, ignore_index=True) 

109 return df 

110 elif isinstance(data.get('data', None), dict): 

111 # Add 'id' and 'attributes' to the DataFrame 

112 row_data = {'id': data['data']['id'], **data['data']['attributes']} 

113 df = df._append(row_data, ignore_index=True) 

114 return df 

115 else: 

116 raise Exception(f"Error connecting to Strapi API: {response.status_code} - {response.text}") 

117 

118 return pd.DataFrame() 

119 

120 

121connection_args = OrderedDict( 

122 api_token={ 

123 "type": ARG_TYPE.PWD, 

124 "description": "Strapi API key to use for authentication.", 

125 "required": True, 

126 "label": "Api token", 

127 }, 

128 host={ 

129 "type": ARG_TYPE.URL, 

130 "description": "Strapi API host to connect to.", 

131 "required": True, 

132 "label": "Host", 

133 }, 

134 port={ 

135 "type": ARG_TYPE.INT, 

136 "description": "Strapi API port to connect to.", 

137 "required": True, 

138 "label": "Port", 

139 }, 

140 plural_api_ids={ 

141 "type": list, 

142 "description": "Plural API id to use for querying.", 

143 "required": True, 

144 "label": "Plural API id", 

145 }, 

146) 

147 

148connection_args_example = OrderedDict( 

149 host="localhost", 

150 port=1337, 

151 api_token="c56c000d867e95848c", 

152 plural_api_ids=["posts", "portfolios"], 

153)