Coverage for mindsdb / api / executor / planner / steps.py: 89%
174 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb.api.executor.planner.exceptions import PlanningException
2from mindsdb.api.executor.planner.step_result import Result
5class PlanStep:
6 def __init__(self, step_num=None):
7 self.step_num = step_num
9 @property
10 def result(self):
11 if self.step_num is None: 11 ↛ 12line 11 didn't jump to line 12 because the condition on line 11 was never true
12 raise PlanningException(
13 f"Can't reference a step with no assigned step number. Tried to reference: {type(self)}"
14 )
15 return Result(self.step_num)
17 def __eq__(self, other):
18 if type(self) != type(other): 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true
19 return False
21 for k in vars(self):
22 # skip result comparison
23 if k == "result_data":
24 continue
26 if getattr(self, k) != getattr(other, k): 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true
27 return False
29 return True
31 def __repr__(self):
32 attrs_dict = vars(self)
33 attrs_str = ", ".join([f"{k}={str(v)}" for k, v in attrs_dict.items()])
34 return f"{self.__class__.__name__}({attrs_str})"
36 def set_result(self, result):
37 self.result_data = result
40class ProjectStep(PlanStep):
41 """Selects columns from a dataframe"""
43 def __init__(self, columns, dataframe, ignore_doubles=False, *args, **kwargs):
44 super().__init__(*args, **kwargs)
45 self.columns = columns
46 self.dataframe = dataframe
47 self.ignore_doubles = ignore_doubles
50# TODO remove
51class FilterStep(PlanStep):
52 """Filters some dataframe according to a query"""
54 def __init__(self, dataframe, query, *args, **kwargs):
55 super().__init__(*args, **kwargs)
56 self.dataframe = dataframe
57 self.query = query
60# TODO remove
61class GroupByStep(PlanStep):
62 """Groups output by columns and computes aggregation functions"""
64 def __init__(self, dataframe, columns, targets, *args, **kwargs):
65 super().__init__(*args, **kwargs)
66 self.dataframe = dataframe
67 self.columns = columns
68 self.targets = targets
71class JoinStep(PlanStep):
72 """Joins two dataframes, producing a new dataframe"""
74 def __init__(self, left, right, query, *args, **kwargs):
75 super().__init__(*args, **kwargs)
76 self.left = left
77 self.right = right
78 self.query = query
81class UnionStep(PlanStep):
82 """Union of two dataframes, producing a new dataframe"""
84 def __init__(self, left, right, unique, operation="union", *args, **kwargs):
85 super().__init__(*args, **kwargs)
86 self.left = left
87 self.right = right
88 self.unique = unique
89 self.operation = operation
92# TODO remove
93class OrderByStep(PlanStep):
94 """Applies sorting to a dataframe"""
96 def __init__(self, dataframe, order_by, *args, **kwargs):
97 super().__init__(*args, **kwargs)
98 self.dataframe = dataframe
99 self.order_by = order_by
102class LimitOffsetStep(PlanStep):
103 """Applies limit and offset to a dataframe"""
105 def __init__(self, dataframe, limit=None, offset=None, *args, **kwargs):
106 super().__init__(*args, **kwargs)
107 self.dataframe = dataframe
108 self.limit = limit
109 self.offset = offset
112class FetchDataframeStep(PlanStep):
113 """Fetches a dataframe from external integration"""
115 def __init__(self, integration, query=None, raw_query=None, params=None, *args, **kwargs):
116 super().__init__(*args, **kwargs)
117 self.integration = integration
118 self.query = query
119 self.raw_query = raw_query
120 self.params = params
123class FetchDataframeStepPartition(FetchDataframeStep):
124 """Fetches a dataframe from external integration in partitions"""
126 def __init__(self, steps=None, condition=None, *args, **kwargs):
127 super().__init__(*args, **kwargs)
128 if steps is None:
129 steps = []
130 self.steps = steps
131 self.condition = condition
134class ApplyPredictorStep(PlanStep):
135 """Applies a mindsdb predictor on some dataframe and returns a new dataframe with predictions"""
137 def __init__(
138 self,
139 namespace,
140 predictor,
141 dataframe,
142 params: dict = None,
143 row_dict: dict = None,
144 columns_map: dict = None,
145 *args,
146 **kwargs,
147 ):
148 super().__init__(*args, **kwargs)
149 self.namespace = namespace
150 self.predictor = predictor
151 self.dataframe = dataframe
152 self.params = params
154 # columns to add to input data, struct: {column name: value}
155 self.row_dict = row_dict
157 # rename columns in input data, struct: {a str: b Identifier}
158 # renames b to a
159 self.columns_map = columns_map
162class ApplyTimeseriesPredictorStep(ApplyPredictorStep):
163 """Applies a mindsdb predictor on some dataframe and returns a new dataframe with predictions.
164 Accepts an additional parameter output_time_filter that specifies for which dates the predictions should be returned
165 """
167 def __init__(self, *args, output_time_filter=None, **kwargs):
168 super().__init__(*args, **kwargs)
169 self.output_time_filter = output_time_filter
172class ApplyPredictorRowStep(PlanStep):
173 """Applies a mindsdb predictor to one row of values and returns a dataframe of one row, the predictor."""
175 def __init__(self, namespace, predictor, row_dict, params=None, *args, **kwargs):
176 super().__init__(*args, **kwargs)
177 self.namespace = namespace
178 self.predictor = predictor
179 self.row_dict = row_dict
180 self.params = params
183class GetPredictorColumns(PlanStep):
184 """Returns an empty dataframe of shape and columns like predictor results."""
186 def __init__(self, namespace, predictor, *args, **kwargs):
187 super().__init__(*args, **kwargs)
188 self.namespace = namespace
189 self.predictor = predictor
192class GetTableColumns(PlanStep):
193 """Returns an empty dataframe of shape and columns like select from table."""
195 def __init__(self, namespace, table, *args, **kwargs):
196 super().__init__(*args, **kwargs)
197 self.namespace = namespace
198 self.table = table
201class MapReduceStep(PlanStep):
202 """Applies a step for each value in a list, and then reduces results to a single dataframe"""
204 def __init__(self, values, step, reduce="union", partition=None, *args, **kwargs):
205 """
206 :param values: input step data
207 :param step: step to be applied
208 :param reduce: type of reduce to be applied
209 :param partition: type of partition to be applied
210 - <number> - split data by chunks with equal size
211 - None - every record is variables to fill
212 """
213 super().__init__(*args, **kwargs)
214 self.values = values
215 self.step = step
216 self.reduce = reduce
217 self.partition = partition
220class MultipleSteps(PlanStep):
221 def __init__(self, steps, reduce=None, *args, **kwargs):
222 """Runs multiple steps and reduces results to a single dataframe"""
223 super().__init__(*args, **kwargs)
224 self.steps = steps
225 self.reduce = reduce
228class SaveToTable(PlanStep):
229 def __init__(self, table, dataframe, is_replace=False, params=None, *args, **kwargs):
230 """
231 Creates table if not exists and fills it with content of dataframe
232 is_replace - to drop table beforehand
233 """
234 super().__init__(*args, **kwargs)
235 self.table = table
236 self.dataframe = dataframe
237 self.is_replace = is_replace
238 if params is None: 238 ↛ 240line 238 didn't jump to line 240 because the condition on line 238 was always true
239 params = {}
240 self.params = params
243class InsertToTable(PlanStep):
244 def __init__(self, table, dataframe=None, query=None, params=None, *args, **kwargs):
245 """Fills table with content of dataframe"""
246 super().__init__(*args, **kwargs)
247 self.table = table
248 self.dataframe = dataframe
249 self.query = query
250 if params is None:
251 params = {}
252 self.params = params
255class CreateTableStep(PlanStep):
256 def __init__(self, table, columns=None, is_replace=False, *args, **kwargs):
257 """Fills table with content of dataframe"""
258 super().__init__(*args, **kwargs)
259 self.table = table
260 self.columns = columns
261 self.is_replace = is_replace
264class UpdateToTable(PlanStep):
265 def __init__(self, table, dataframe, update_command, *args, **kwargs):
266 """Fills table with content of dataframe"""
267 super().__init__(*args, **kwargs)
268 self.table = table
269 self.dataframe = dataframe
270 self.update_command = update_command
273class DeleteStep(PlanStep):
274 def __init__(self, table, where, *args, **kwargs):
275 """Fills table with content of dataframe"""
276 super().__init__(*args, **kwargs)
277 self.table = table
278 self.where = where
281class SubSelectStep(PlanStep):
282 def __init__(self, query, dataframe, table_name=None, add_absent_cols=False, *args, **kwargs):
283 """Performs select from dataframe"""
284 super().__init__(*args, **kwargs)
285 self.query = query
286 self.dataframe = dataframe
287 self.table_name = table_name
288 self.add_absent_cols = add_absent_cols
291class QueryStep(PlanStep):
292 def __init__(self, query, from_table=None, *args, strict_where=True, **kwargs):
293 """Performs query using injected dataframe"""
294 super().__init__(*args, **kwargs)
295 self.query = query
296 self.from_table = from_table
297 self.strict_where = strict_where
300class DataStep(PlanStep):
301 def __init__(self, data, *args, **kwargs):
302 super().__init__(*args, **kwargs)
303 self.data = data