Coverage for mindsdb / integrations / handlers / strapi_handler / strapi_tables.py: 0%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import List 

2import pandas as pd 

3from mindsdb.integrations.libs.api_handler import APIHandler, APITable 

4from mindsdb_sql_parser import ast 

5from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

6from mindsdb_sql_parser.ast.select.constant import Constant 

7import json 

8 

9 

10class StrapiTable(APITable): 

11 

12 def __init__(self, handler: APIHandler, name: str): 

13 super().__init__(handler) 

14 self.name = name 

15 # get all the fields of a collection as columns 

16 self.columns = self.handler.call_strapi_api(method='GET', endpoint=f'/api/{name}').columns 

17 

18 def select(self, query: ast.Select) -> pd.DataFrame: 

19 """Triggered at the SELECT query 

20 

21 Args: 

22 query (ast.Select): User's entered query 

23 

24 Returns: 

25 pd.DataFrame: The queried information 

26 """ 

27 # Initialize _id and selected_columns 

28 _id = None 

29 selected_columns = [] 

30 

31 # Get id from where clause, if available 

32 conditions = extract_comparison_conditions(query.where) 

33 for op, arg1, arg2 in conditions: 

34 if arg1 == 'id' and op == '=': 

35 _id = arg2 

36 else: 

37 raise ValueError("Unsupported condition in WHERE clause") 

38 

39 # Get selected columns from query 

40 for target in query.targets: 

41 if isinstance(target, ast.Star): 

42 selected_columns = self.get_columns() 

43 break 

44 elif isinstance(target, ast.Identifier): 

45 selected_columns.append(target.parts[-1]) 

46 else: 

47 raise ValueError(f"Unknown query target {type(target)}") 

48 

49 # Initialize the result DataFrame 

50 result_df = None 

51 

52 if _id is not None: 

53 # Fetch data using the provided endpoint for the specific id 

54 df = self.handler.call_strapi_api(method='GET', endpoint=f'/api/{self.name}/{_id}') 

55 

56 if len(df) > 0: 

57 result_df = df[selected_columns] 

58 else: 

59 # Fetch data without specifying an id 

60 page_size = 100 # The page size you want to use for API requests 

61 limit = query.limit.value if query.limit else None 

62 result_df = pd.DataFrame(columns=selected_columns) 

63 

64 if limit: 

65 # Calculate the number of pages required 

66 page_count = (limit + page_size - 1) // page_size 

67 else: 

68 page_count = 1 

69 

70 for page in range(1, page_count + 1): 

71 if limit: 

72 # Calculate the page size for this request 

73 current_page_size = min(page_size, limit) 

74 else: 

75 current_page_size = page_size 

76 

77 df = self.handler.call_strapi_api(method='GET', endpoint=f'/api/{self.name}', params={'pagination[page]': page, 'pagination[pageSize]': current_page_size}) 

78 

79 if len(df) == 0: 

80 break 

81 

82 result_df = pd.concat([result_df, df[selected_columns]], ignore_index=True) 

83 

84 if limit: 

85 limit -= current_page_size 

86 

87 return result_df 

88 

89 def insert(self, query: ast.Insert) -> None: 

90 """triggered at the INSERT query 

91 Args: 

92 query (ast.Insert): user's entered query 

93 """ 

94 data = {'data': {}} 

95 for column, value in zip(query.columns, query.values[0]): 

96 if isinstance(value, Constant): 

97 data['data'][column.name] = value.value 

98 else: 

99 data['data'][column.name] = value 

100 self.handler.call_strapi_api(method='POST', endpoint=f'/api/{self.name}', json_data=json.dumps(data)) 

101 

102 def update(self, query: ast.Update) -> None: 

103 """triggered at the UPDATE query 

104 

105 Args: 

106 query (ast.Update): user's entered query 

107 """ 

108 conditions = extract_comparison_conditions(query.where) 

109 # Get id from query 

110 for op, arg1, arg2 in conditions: 

111 if arg1 == 'id' and op == '=': 

112 _id = arg2 

113 else: 

114 raise NotImplementedError 

115 data = {'data': {}} 

116 for key, value in query.update_columns.items(): 

117 if isinstance(value, Constant): 

118 data['data'][key] = value.value 

119 self.handler.call_strapi_api(method='PUT', endpoint=f'/api/{self.name}/{_id}', json_data=json.dumps(data)) 

120 

121 def get_columns(self, ignore: List[str] = []) -> List[str]: 

122 """columns 

123 

124 Args: 

125 ignore (List[str], optional): exclusion items. Defaults to []. 

126 

127 Returns: 

128 List[str]: available columns with `ignore` items removed from the list. 

129 """ 

130 

131 return [item for item in self.columns if item not in ignore]