Coverage for mindsdb / api / executor / planner / steps.py: 89%

174 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb.api.executor.planner.exceptions import PlanningException 

2from mindsdb.api.executor.planner.step_result import Result 

3 

4 

5class PlanStep: 

6 def __init__(self, step_num=None): 

7 self.step_num = step_num 

8 

9 @property 

10 def result(self): 

11 if self.step_num is None: 11 ↛ 12line 11 didn't jump to line 12 because the condition on line 11 was never true

12 raise PlanningException( 

13 f"Can't reference a step with no assigned step number. Tried to reference: {type(self)}" 

14 ) 

15 return Result(self.step_num) 

16 

17 def __eq__(self, other): 

18 if type(self) != type(other): 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true

19 return False 

20 

21 for k in vars(self): 

22 # skip result comparison 

23 if k == "result_data": 

24 continue 

25 

26 if getattr(self, k) != getattr(other, k): 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 return False 

28 

29 return True 

30 

31 def __repr__(self): 

32 attrs_dict = vars(self) 

33 attrs_str = ", ".join([f"{k}={str(v)}" for k, v in attrs_dict.items()]) 

34 return f"{self.__class__.__name__}({attrs_str})" 

35 

36 def set_result(self, result): 

37 self.result_data = result 

38 

39 

40class ProjectStep(PlanStep): 

41 """Selects columns from a dataframe""" 

42 

43 def __init__(self, columns, dataframe, ignore_doubles=False, *args, **kwargs): 

44 super().__init__(*args, **kwargs) 

45 self.columns = columns 

46 self.dataframe = dataframe 

47 self.ignore_doubles = ignore_doubles 

48 

49 

50# TODO remove 

51class FilterStep(PlanStep): 

52 """Filters some dataframe according to a query""" 

53 

54 def __init__(self, dataframe, query, *args, **kwargs): 

55 super().__init__(*args, **kwargs) 

56 self.dataframe = dataframe 

57 self.query = query 

58 

59 

60# TODO remove 

61class GroupByStep(PlanStep): 

62 """Groups output by columns and computes aggregation functions""" 

63 

64 def __init__(self, dataframe, columns, targets, *args, **kwargs): 

65 super().__init__(*args, **kwargs) 

66 self.dataframe = dataframe 

67 self.columns = columns 

68 self.targets = targets 

69 

70 

71class JoinStep(PlanStep): 

72 """Joins two dataframes, producing a new dataframe""" 

73 

74 def __init__(self, left, right, query, *args, **kwargs): 

75 super().__init__(*args, **kwargs) 

76 self.left = left 

77 self.right = right 

78 self.query = query 

79 

80 

81class UnionStep(PlanStep): 

82 """Union of two dataframes, producing a new dataframe""" 

83 

84 def __init__(self, left, right, unique, operation="union", *args, **kwargs): 

85 super().__init__(*args, **kwargs) 

86 self.left = left 

87 self.right = right 

88 self.unique = unique 

89 self.operation = operation 

90 

91 

92# TODO remove 

93class OrderByStep(PlanStep): 

94 """Applies sorting to a dataframe""" 

95 

96 def __init__(self, dataframe, order_by, *args, **kwargs): 

97 super().__init__(*args, **kwargs) 

98 self.dataframe = dataframe 

99 self.order_by = order_by 

100 

101 

102class LimitOffsetStep(PlanStep): 

103 """Applies limit and offset to a dataframe""" 

104 

105 def __init__(self, dataframe, limit=None, offset=None, *args, **kwargs): 

106 super().__init__(*args, **kwargs) 

107 self.dataframe = dataframe 

108 self.limit = limit 

109 self.offset = offset 

110 

111 

112class FetchDataframeStep(PlanStep): 

113 """Fetches a dataframe from external integration""" 

114 

115 def __init__(self, integration, query=None, raw_query=None, params=None, *args, **kwargs): 

116 super().__init__(*args, **kwargs) 

117 self.integration = integration 

118 self.query = query 

119 self.raw_query = raw_query 

120 self.params = params 

121 

122 

123class FetchDataframeStepPartition(FetchDataframeStep): 

124 """Fetches a dataframe from external integration in partitions""" 

125 

126 def __init__(self, steps=None, condition=None, *args, **kwargs): 

127 super().__init__(*args, **kwargs) 

128 if steps is None: 

129 steps = [] 

130 self.steps = steps 

131 self.condition = condition 

132 

133 

134class ApplyPredictorStep(PlanStep): 

135 """Applies a mindsdb predictor on some dataframe and returns a new dataframe with predictions""" 

136 

137 def __init__( 

138 self, 

139 namespace, 

140 predictor, 

141 dataframe, 

142 params: dict = None, 

143 row_dict: dict = None, 

144 columns_map: dict = None, 

145 *args, 

146 **kwargs, 

147 ): 

148 super().__init__(*args, **kwargs) 

149 self.namespace = namespace 

150 self.predictor = predictor 

151 self.dataframe = dataframe 

152 self.params = params 

153 

154 # columns to add to input data, struct: {column name: value} 

155 self.row_dict = row_dict 

156 

157 # rename columns in input data, struct: {a str: b Identifier} 

158 # renames b to a 

159 self.columns_map = columns_map 

160 

161 

162class ApplyTimeseriesPredictorStep(ApplyPredictorStep): 

163 """Applies a mindsdb predictor on some dataframe and returns a new dataframe with predictions. 

164 Accepts an additional parameter output_time_filter that specifies for which dates the predictions should be returned 

165 """ 

166 

167 def __init__(self, *args, output_time_filter=None, **kwargs): 

168 super().__init__(*args, **kwargs) 

169 self.output_time_filter = output_time_filter 

170 

171 

172class ApplyPredictorRowStep(PlanStep): 

173 """Applies a mindsdb predictor to one row of values and returns a dataframe of one row, the predictor.""" 

174 

175 def __init__(self, namespace, predictor, row_dict, params=None, *args, **kwargs): 

176 super().__init__(*args, **kwargs) 

177 self.namespace = namespace 

178 self.predictor = predictor 

179 self.row_dict = row_dict 

180 self.params = params 

181 

182 

183class GetPredictorColumns(PlanStep): 

184 """Returns an empty dataframe of shape and columns like predictor results.""" 

185 

186 def __init__(self, namespace, predictor, *args, **kwargs): 

187 super().__init__(*args, **kwargs) 

188 self.namespace = namespace 

189 self.predictor = predictor 

190 

191 

192class GetTableColumns(PlanStep): 

193 """Returns an empty dataframe of shape and columns like select from table.""" 

194 

195 def __init__(self, namespace, table, *args, **kwargs): 

196 super().__init__(*args, **kwargs) 

197 self.namespace = namespace 

198 self.table = table 

199 

200 

201class MapReduceStep(PlanStep): 

202 """Applies a step for each value in a list, and then reduces results to a single dataframe""" 

203 

204 def __init__(self, values, step, reduce="union", partition=None, *args, **kwargs): 

205 """ 

206 :param values: input step data 

207 :param step: step to be applied 

208 :param reduce: type of reduce to be applied 

209 :param partition: type of partition to be applied 

210 - <number> - split data by chunks with equal size 

211 - None - every record is variables to fill 

212 """ 

213 super().__init__(*args, **kwargs) 

214 self.values = values 

215 self.step = step 

216 self.reduce = reduce 

217 self.partition = partition 

218 

219 

220class MultipleSteps(PlanStep): 

221 def __init__(self, steps, reduce=None, *args, **kwargs): 

222 """Runs multiple steps and reduces results to a single dataframe""" 

223 super().__init__(*args, **kwargs) 

224 self.steps = steps 

225 self.reduce = reduce 

226 

227 

228class SaveToTable(PlanStep): 

229 def __init__(self, table, dataframe, is_replace=False, params=None, *args, **kwargs): 

230 """ 

231 Creates table if not exists and fills it with content of dataframe 

232 is_replace - to drop table beforehand 

233 """ 

234 super().__init__(*args, **kwargs) 

235 self.table = table 

236 self.dataframe = dataframe 

237 self.is_replace = is_replace 

238 if params is None: 238 ↛ 240line 238 didn't jump to line 240 because the condition on line 238 was always true

239 params = {} 

240 self.params = params 

241 

242 

243class InsertToTable(PlanStep): 

244 def __init__(self, table, dataframe=None, query=None, params=None, *args, **kwargs): 

245 """Fills table with content of dataframe""" 

246 super().__init__(*args, **kwargs) 

247 self.table = table 

248 self.dataframe = dataframe 

249 self.query = query 

250 if params is None: 

251 params = {} 

252 self.params = params 

253 

254 

255class CreateTableStep(PlanStep): 

256 def __init__(self, table, columns=None, is_replace=False, *args, **kwargs): 

257 """Fills table with content of dataframe""" 

258 super().__init__(*args, **kwargs) 

259 self.table = table 

260 self.columns = columns 

261 self.is_replace = is_replace 

262 

263 

264class UpdateToTable(PlanStep): 

265 def __init__(self, table, dataframe, update_command, *args, **kwargs): 

266 """Fills table with content of dataframe""" 

267 super().__init__(*args, **kwargs) 

268 self.table = table 

269 self.dataframe = dataframe 

270 self.update_command = update_command 

271 

272 

273class DeleteStep(PlanStep): 

274 def __init__(self, table, where, *args, **kwargs): 

275 """Fills table with content of dataframe""" 

276 super().__init__(*args, **kwargs) 

277 self.table = table 

278 self.where = where 

279 

280 

281class SubSelectStep(PlanStep): 

282 def __init__(self, query, dataframe, table_name=None, add_absent_cols=False, *args, **kwargs): 

283 """Performs select from dataframe""" 

284 super().__init__(*args, **kwargs) 

285 self.query = query 

286 self.dataframe = dataframe 

287 self.table_name = table_name 

288 self.add_absent_cols = add_absent_cols 

289 

290 

291class QueryStep(PlanStep): 

292 def __init__(self, query, from_table=None, *args, strict_where=True, **kwargs): 

293 """Performs query using injected dataframe""" 

294 super().__init__(*args, **kwargs) 

295 self.query = query 

296 self.from_table = from_table 

297 self.strict_where = strict_where 

298 

299 

300class DataStep(PlanStep): 

301 def __init__(self, data, *args, **kwargs): 

302 super().__init__(*args, **kwargs) 

303 self.data = data