Coverage for mindsdb / api / executor / sql_query / sql_query.py: 57%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1""" 

2******************************************************* 

3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com> 

4 * 

5 * This file is part of MindsDB Server. 

6 * 

7 * MindsDB Server can not be copied and/or distributed without the express 

8 * permission of MindsDB Inc 

9 ******************************************************* 

10""" 

11 

12import inspect 

13from textwrap import dedent 

14from typing import Union, Dict 

15 

16import pandas as pd 

17from mindsdb_sql_parser import parse_sql, ASTNode 

18 

19from mindsdb.api.executor.planner.steps import ( 

20 ApplyTimeseriesPredictorStep, 

21 ApplyPredictorRowStep, 

22 ApplyPredictorStep, 

23) 

24 

25from mindsdb.api.executor.planner.exceptions import PlanningException 

26from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

27from mindsdb.api.executor.planner import query_planner 

28 

29from mindsdb.api.executor.utilities.sql import get_query_models 

30from mindsdb.interfaces.model.functions import get_model_record 

31from mindsdb.api.executor.exceptions import ( 

32 BadTableError, 

33 UnknownError, 

34 LogicError, 

35) 

36import mindsdb.utilities.profiler as profiler 

37from mindsdb.utilities.fs import create_process_mark, delete_process_mark 

38from mindsdb.utilities.exception import EntityNotExistsError 

39from mindsdb.interfaces.query_context.context_controller import query_context_controller 

40from mindsdb.utilities.context import context as ctx 

41 

42 

43from . import steps 

44from .result_set import ResultSet, Column 

45from .steps.base import BaseStepCall 

46 

47 

48class SQLQuery: 

49 step_handlers = {} 

50 

51 def __init__( 

52 self, 

53 sql: Union[ASTNode, str], 

54 session, 

55 execute: bool = True, 

56 database: str = None, 

57 query_id: int = None, 

58 stop_event=None, 

59 ): 

60 self.session = session 

61 

62 self.query_id = query_id 

63 if self.query_id is not None: 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true

64 # get sql and database from resumed query 

65 run_query = query_context_controller.get_query(self.query_id) 

66 sql = run_query.sql 

67 database = run_query.database 

68 

69 if database is not None: 69 ↛ 72line 69 didn't jump to line 72 because the condition on line 69 was always true

70 self.database = database 

71 else: 

72 self.database = session.database 

73 

74 self.context = {"database": None if self.database == "" else self.database.lower(), "row_id": 0} 

75 

76 self.columns_list = None 

77 self.steps_data: Dict[int, ResultSet] = {} 

78 

79 self.planner: query_planner.QueryPlanner = None 

80 self.parameters = [] 

81 self.fetched_data: ResultSet = None 

82 

83 self.outer_query = None 

84 self.run_query = None 

85 self.stop_event = stop_event 

86 

87 if isinstance(sql, str): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 self.query = parse_sql(sql) 

89 self.context["query_str"] = sql 

90 else: 

91 self.query = sql 

92 renderer = SqlalchemyRender("mysql") 

93 try: 

94 self.context["query_str"] = renderer.get_string(self.query, with_failback=True) 

95 except Exception: 

96 self.context["query_str"] = str(self.query) 

97 

98 self.create_planner() 

99 

100 if execute: 100 ↛ exitline 100 didn't return from function '__init__' because the condition on line 100 was always true

101 self.execute_query() 

102 

103 @classmethod 

104 def register_steps(cls): 

105 cls.step_handlers = {} 

106 for _, cl in inspect.getmembers(steps): 

107 if inspect.isclass(cl) and issubclass(cl, BaseStepCall): 

108 if cl.bind is not None: 108 ↛ 106line 108 didn't jump to line 106 because the condition on line 108 was always true

109 step_name = cl.bind.__name__ 

110 cls.step_handlers[step_name] = cl 

111 

112 @profiler.profile() 

113 def create_planner(self): 

114 databases = self.session.database_controller.get_list() 

115 

116 predictor_metadata = [] 

117 

118 query_tables = get_query_models(self.query, default_database=self.database) 

119 

120 for project_name, table_name, table_version in query_tables: 

121 args = {"name": table_name, "project_name": project_name} 

122 if table_version is not None: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 args["active"] = None 

124 args["version"] = table_version 

125 

126 model_record = get_model_record(**args) 

127 if model_record is None: 127 ↛ 145line 127 didn't jump to line 145 because the condition on line 127 was always true

128 # check if it is an agent 

129 try: 

130 agent = self.session.agents_controller.get_agent(table_name, project_name) 

131 except EntityNotExistsError: 

132 continue 

133 if agent is not None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 predictor = { 

135 "name": table_name, 

136 "integration_name": project_name, # integration_name, 

137 "timeseries": False, 

138 "id": agent.id, 

139 "to_predict": "answer", 

140 } 

141 predictor_metadata.append(predictor) 

142 

143 continue 

144 

145 if model_record.status == "error": 

146 dot_version_str = "" 

147 and_version_str = "" 

148 if table_version is not None: 

149 dot_version_str = f".{table_version}" 

150 and_version_str = f" and version = {table_version}" 

151 

152 raise BadTableError( 

153 dedent(f"""\ 

154 The model '{table_name}{dot_version_str}' cannot be used as it is currently in 'error' status. 

155 For detailed information about the error, please execute the following command: 

156 

157 select error from information_schema.models where name = '{table_name}'{and_version_str}; 

158 """) 

159 ) 

160 

161 ts_settings = model_record.learn_args.get("timeseries_settings", {}) 

162 predictor = { 

163 "name": table_name, 

164 "integration_name": project_name, # integration_name, 

165 "timeseries": False, 

166 "id": model_record.id, 

167 "to_predict": model_record.to_predict, 

168 } 

169 if ts_settings.get("is_timeseries") is True: 

170 window = ts_settings.get("window") 

171 order_by = ts_settings.get("order_by") 

172 if isinstance(order_by, list): 

173 order_by = order_by[0] 

174 group_by = ts_settings.get("group_by") 

175 if isinstance(group_by, list) is False and group_by is not None: 

176 group_by = [group_by] 

177 predictor.update( 

178 { 

179 "timeseries": True, 

180 "window": window, 

181 "horizon": ts_settings.get("horizon"), 

182 "order_by_column": order_by, 

183 "group_by_columns": group_by, 

184 } 

185 ) 

186 

187 predictor["model_types"] = model_record.data.get("dtypes", {}) 

188 

189 predictor_metadata.append(predictor) 

190 

191 database = None if self.database == "" else self.database.lower() 

192 

193 self.context["predictor_metadata"] = predictor_metadata 

194 self.planner = query_planner.QueryPlanner( 

195 self.query, 

196 integrations=databases, 

197 predictor_metadata=predictor_metadata, 

198 default_namespace=database, 

199 ) 

200 

201 def prepare_query(self): 

202 """it is prepared statement call""" 

203 try: 

204 for step in self.planner.prepare_steps(self.query): 

205 data = self.execute_step(step) 

206 step.set_result(data) 

207 self.steps_data[step.step_num] = data 

208 except PlanningException as e: 

209 raise LogicError(e) from e 

210 

211 statement_info = self.planner.get_statement_info() 

212 

213 self.columns_list = [] 

214 for col in statement_info["columns"]: 

215 self.columns_list.append( 

216 Column( 

217 database=col["ds"], 

218 table_name=col["table_name"], 

219 table_alias=col["table_alias"], 

220 name=col["name"], 

221 alias=col["alias"], 

222 type=col["type"], 

223 ) 

224 ) 

225 

226 self.parameters = [ 

227 Column(name=col["name"], alias=col["alias"], type=col["type"]) for col in statement_info["parameters"] 

228 ] 

229 

230 def execute_query(self): 

231 if self.fetched_data is not None: 231 ↛ 233line 231 didn't jump to line 233 because the condition on line 231 was never true

232 # no need to execute 

233 return 

234 

235 try: 

236 steps = list(self.planner.execute_steps()) 

237 except PlanningException as e: 

238 raise LogicError(e) from e 

239 

240 if self.planner.plan.is_resumable: 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was never true

241 # create query 

242 if self.query_id is not None: 

243 self.run_query = query_context_controller.get_query(self.query_id) 

244 else: 

245 self.run_query = query_context_controller.create_query( 

246 self.context["query_str"], database=self.database 

247 ) 

248 

249 if self.planner.plan.is_async and ctx.task_id is None: 

250 # add to task 

251 self.run_query.add_to_task() 

252 # return query info 

253 # columns in upper case 

254 rec = {k.upper(): v for k, v in self.run_query.get_info().items()} 

255 self.fetched_data = ResultSet.from_df(pd.DataFrame([rec])) 

256 self.columns_list = self.fetched_data.columns 

257 return 

258 self.run_query.mark_as_run() 

259 

260 ctx.run_query_id = self.run_query.record.id 

261 

262 step_result = None 

263 process_mark = None 

264 try: 

265 steps_classes = (x.__class__ for x in steps) 

266 predict_steps = (ApplyPredictorRowStep, ApplyPredictorStep, ApplyTimeseriesPredictorStep) 

267 if any(s in predict_steps for s in steps_classes): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 process_mark = create_process_mark("predict") 

269 for step in steps: 

270 with profiler.Context(f"step: {step.__class__.__name__}"): 

271 step_result = self.execute_step(step) 

272 self.steps_data[step.step_num] = step_result 

273 except Exception as e: 

274 if self.run_query is not None: 274 ↛ 276line 274 didn't jump to line 276 because the condition on line 274 was never true

275 # set error and place where it stopped 

276 self.run_query.on_error(e, step.step_num, self.steps_data) 

277 raise e 

278 else: 

279 # mark running query as completed 

280 if self.run_query is not None: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 self.run_query.finish() 

282 ctx.run_query_id = None 

283 finally: 

284 if process_mark is not None: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 delete_process_mark("predict", process_mark) 

286 

287 # save updated query 

288 self.query = self.planner.query 

289 

290 # there was no executing 

291 if len(self.steps_data) == 0: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 return 

293 

294 self.fetched_data = step_result 

295 

296 try: 

297 if hasattr(self, "columns_list") is False: 297 ↛ 299line 297 didn't jump to line 299 because the condition on line 297 was never true

298 # how it becomes False? 

299 self.columns_list = self.fetched_data.columns 

300 

301 if self.columns_list is None: 301 ↛ 304line 301 didn't jump to line 304 because the condition on line 301 was always true

302 self.columns_list = self.fetched_data.columns 

303 

304 for col in self.fetched_data.find_columns("__mindsdb_row_id"): 304 ↛ 305line 304 didn't jump to line 305 because the loop on line 304 never started

305 self.fetched_data.del_column(col) 

306 

307 except Exception as e: 

308 raise UnknownError("error in column list step") from e 

309 

310 def execute_step(self, step, steps_data=None): 

311 cls_name = step.__class__.__name__ 

312 handler = self.step_handlers.get(cls_name) 

313 if handler is None: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 raise UnknownError(f"Unknown step: {cls_name}") 

315 

316 return handler(self, steps_data=steps_data).call(step) 

317 

318 

319SQLQuery.register_steps()