Coverage for mindsdb / integrations / utilities / handlers / query_utilities / select_query_utilities.py: 0%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Text, List, Dict, Tuple 

2 

3import pandas as pd 

4from mindsdb_sql_parser import ast 

5 

6from mindsdb.integrations.utilities.sql_utils import sort_dataframe 

7from mindsdb.integrations.utilities.handlers.query_utilities.base_query_utilities import BaseQueryParser 

8from mindsdb.integrations.utilities.handlers.query_utilities.base_query_utilities import BaseQueryExecutor 

9 

10 

11class SELECTQueryParser(BaseQueryParser): 

12 """ 

13 Parses a SELECT query into its component parts. 

14 

15 Parameters 

16 ---------- 

17 query : ast.Select 

18 Given SQL SELECT query. 

19 table : Text 

20 Name of the table to query. 

21 columns : List[Text] 

22 List of columns in the table. 

23 """ 

24 def __init__(self, query: ast.Select, table: Text, columns: List[Text]): 

25 super().__init__(query) 

26 self.table = table 

27 self.columns = columns 

28 

29 def parse_query(self) -> Tuple[List[Text], List[List[Text]], Dict[Text, List[Text]], int]: 

30 """ 

31 Parses a SQL SELECT statement into its components: SELECT, WHERE, ORDER BY, LIMIT. 

32 """ 

33 selected_columns = self.parse_select_clause() 

34 where_conditions = self.parse_where_clause() 

35 order_by_conditions = self.parse_order_by_clause() 

36 result_limit = self.parse_limit_clause() 

37 

38 return selected_columns, where_conditions, order_by_conditions, result_limit 

39 

40 def parse_select_clause(self) -> List[Text]: 

41 """ 

42 Parses the SELECT (column selection) clause of the query. 

43 """ 

44 selected_columns = [] 

45 for target in self.query.targets: 

46 if isinstance(target, ast.Star): 

47 selected_columns = self.columns 

48 break 

49 elif isinstance(target, ast.Identifier): 

50 selected_columns.append(target.parts[-1]) 

51 else: 

52 raise ValueError(f"Unknown query target {type(target)}") 

53 

54 return selected_columns 

55 

56 def parse_order_by_clause(self) -> Dict[Text, List[Text]]: 

57 """ 

58 Parses the ORDER BY clause of the query. 

59 """ 

60 if self.query.order_by and len(self.query.order_by) > 0: 

61 return self.query.order_by 

62 else: 

63 return [] 

64 

65 def parse_limit_clause(self) -> int: 

66 """ 

67 Parses the LIMIT clause of the query. 

68 """ 

69 if self.query.limit: 

70 result_limit = self.query.limit.value 

71 else: 

72 result_limit = 20 

73 

74 return result_limit 

75 

76 

77class SELECTQueryExecutor(BaseQueryExecutor): 

78 """ 

79 Executes a SELECT query. 

80 

81 Parameters 

82 ---------- 

83 df : pd.DataFrame 

84 Dataframe to query. 

85 selected_columns : List[Text] 

86 List of columns to select. 

87 where_conditions : List[List[Text]] 

88 List of where conditions. 

89 order_by_conditions : Dict[Text, List[Text]] 

90 Dictionary of order by conditions. 

91 result_limit : int 

92 Number of results to return. 

93 """ 

94 def __init__(self, df: pd.DataFrame, selected_columns: List[Text], where_conditions: List[List[Text]], order_by_conditions: List, result_limit: int = None): 

95 super().__init__(df, where_conditions) 

96 self.selected_columns = selected_columns 

97 self.order_by_conditions = order_by_conditions 

98 self.result_limit = result_limit 

99 

100 def execute_query(self): 

101 """ 

102 Execute the query. 

103 """ 

104 self.execute_limit_clause() 

105 

106 self.execute_where_clause() 

107 

108 self.execute_select_clause() 

109 

110 self.execute_order_by_clause() 

111 

112 return self.df 

113 

114 def execute_select_clause(self): 

115 """ 

116 Execute the select clause of the query. 

117 """ 

118 if len(self.df) == 0: 

119 self.df = pd.DataFrame([], columns=self.selected_columns) 

120 else: 

121 self.df = self.df[self.selected_columns] 

122 

123 def execute_order_by_clause(self): 

124 """ 

125 Execute the order by clause of the query. 

126 """ 

127 self.df = sort_dataframe(self.df, self.order_by_conditions) 

128 

129 def execute_limit_clause(self): 

130 """ 

131 Execute the limit clause of the query. 

132 """ 

133 if self.result_limit: 

134 self.df = self.df.head(self.result_limit)