Coverage for mindsdb / api / executor / sql_query / sql_query.py: 57%
176 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1"""
2*******************************************************
3 * Copyright (C) 2017 MindsDB Inc. <copyright@mindsdb.com>
4 *
5 * This file is part of MindsDB Server.
6 *
7 * MindsDB Server can not be copied and/or distributed without the express
8 * permission of MindsDB Inc
9 *******************************************************
10"""
12import inspect
13from textwrap import dedent
14from typing import Union, Dict
16import pandas as pd
17from mindsdb_sql_parser import parse_sql, ASTNode
19from mindsdb.api.executor.planner.steps import (
20 ApplyTimeseriesPredictorStep,
21 ApplyPredictorRowStep,
22 ApplyPredictorStep,
23)
25from mindsdb.api.executor.planner.exceptions import PlanningException
26from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
27from mindsdb.api.executor.planner import query_planner
29from mindsdb.api.executor.utilities.sql import get_query_models
30from mindsdb.interfaces.model.functions import get_model_record
31from mindsdb.api.executor.exceptions import (
32 BadTableError,
33 UnknownError,
34 LogicError,
35)
36import mindsdb.utilities.profiler as profiler
37from mindsdb.utilities.fs import create_process_mark, delete_process_mark
38from mindsdb.utilities.exception import EntityNotExistsError
39from mindsdb.interfaces.query_context.context_controller import query_context_controller
40from mindsdb.utilities.context import context as ctx
43from . import steps
44from .result_set import ResultSet, Column
45from .steps.base import BaseStepCall
48class SQLQuery:
49 step_handlers = {}
51 def __init__(
52 self,
53 sql: Union[ASTNode, str],
54 session,
55 execute: bool = True,
56 database: str = None,
57 query_id: int = None,
58 stop_event=None,
59 ):
60 self.session = session
62 self.query_id = query_id
63 if self.query_id is not None: 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true
64 # get sql and database from resumed query
65 run_query = query_context_controller.get_query(self.query_id)
66 sql = run_query.sql
67 database = run_query.database
69 if database is not None: 69 ↛ 72line 69 didn't jump to line 72 because the condition on line 69 was always true
70 self.database = database
71 else:
72 self.database = session.database
74 self.context = {"database": None if self.database == "" else self.database.lower(), "row_id": 0}
76 self.columns_list = None
77 self.steps_data: Dict[int, ResultSet] = {}
79 self.planner: query_planner.QueryPlanner = None
80 self.parameters = []
81 self.fetched_data: ResultSet = None
83 self.outer_query = None
84 self.run_query = None
85 self.stop_event = stop_event
87 if isinstance(sql, str): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 self.query = parse_sql(sql)
89 self.context["query_str"] = sql
90 else:
91 self.query = sql
92 renderer = SqlalchemyRender("mysql")
93 try:
94 self.context["query_str"] = renderer.get_string(self.query, with_failback=True)
95 except Exception:
96 self.context["query_str"] = str(self.query)
98 self.create_planner()
100 if execute: 100 ↛ exitline 100 didn't return from function '__init__' because the condition on line 100 was always true
101 self.execute_query()
103 @classmethod
104 def register_steps(cls):
105 cls.step_handlers = {}
106 for _, cl in inspect.getmembers(steps):
107 if inspect.isclass(cl) and issubclass(cl, BaseStepCall):
108 if cl.bind is not None: 108 ↛ 106line 108 didn't jump to line 106 because the condition on line 108 was always true
109 step_name = cl.bind.__name__
110 cls.step_handlers[step_name] = cl
112 @profiler.profile()
113 def create_planner(self):
114 databases = self.session.database_controller.get_list()
116 predictor_metadata = []
118 query_tables = get_query_models(self.query, default_database=self.database)
120 for project_name, table_name, table_version in query_tables:
121 args = {"name": table_name, "project_name": project_name}
122 if table_version is not None: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 args["active"] = None
124 args["version"] = table_version
126 model_record = get_model_record(**args)
127 if model_record is None: 127 ↛ 145line 127 didn't jump to line 145 because the condition on line 127 was always true
128 # check if it is an agent
129 try:
130 agent = self.session.agents_controller.get_agent(table_name, project_name)
131 except EntityNotExistsError:
132 continue
133 if agent is not None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 predictor = {
135 "name": table_name,
136 "integration_name": project_name, # integration_name,
137 "timeseries": False,
138 "id": agent.id,
139 "to_predict": "answer",
140 }
141 predictor_metadata.append(predictor)
143 continue
145 if model_record.status == "error":
146 dot_version_str = ""
147 and_version_str = ""
148 if table_version is not None:
149 dot_version_str = f".{table_version}"
150 and_version_str = f" and version = {table_version}"
152 raise BadTableError(
153 dedent(f"""\
154 The model '{table_name}{dot_version_str}' cannot be used as it is currently in 'error' status.
155 For detailed information about the error, please execute the following command:
157 select error from information_schema.models where name = '{table_name}'{and_version_str};
158 """)
159 )
161 ts_settings = model_record.learn_args.get("timeseries_settings", {})
162 predictor = {
163 "name": table_name,
164 "integration_name": project_name, # integration_name,
165 "timeseries": False,
166 "id": model_record.id,
167 "to_predict": model_record.to_predict,
168 }
169 if ts_settings.get("is_timeseries") is True:
170 window = ts_settings.get("window")
171 order_by = ts_settings.get("order_by")
172 if isinstance(order_by, list):
173 order_by = order_by[0]
174 group_by = ts_settings.get("group_by")
175 if isinstance(group_by, list) is False and group_by is not None:
176 group_by = [group_by]
177 predictor.update(
178 {
179 "timeseries": True,
180 "window": window,
181 "horizon": ts_settings.get("horizon"),
182 "order_by_column": order_by,
183 "group_by_columns": group_by,
184 }
185 )
187 predictor["model_types"] = model_record.data.get("dtypes", {})
189 predictor_metadata.append(predictor)
191 database = None if self.database == "" else self.database.lower()
193 self.context["predictor_metadata"] = predictor_metadata
194 self.planner = query_planner.QueryPlanner(
195 self.query,
196 integrations=databases,
197 predictor_metadata=predictor_metadata,
198 default_namespace=database,
199 )
201 def prepare_query(self):
202 """it is prepared statement call"""
203 try:
204 for step in self.planner.prepare_steps(self.query):
205 data = self.execute_step(step)
206 step.set_result(data)
207 self.steps_data[step.step_num] = data
208 except PlanningException as e:
209 raise LogicError(e) from e
211 statement_info = self.planner.get_statement_info()
213 self.columns_list = []
214 for col in statement_info["columns"]:
215 self.columns_list.append(
216 Column(
217 database=col["ds"],
218 table_name=col["table_name"],
219 table_alias=col["table_alias"],
220 name=col["name"],
221 alias=col["alias"],
222 type=col["type"],
223 )
224 )
226 self.parameters = [
227 Column(name=col["name"], alias=col["alias"], type=col["type"]) for col in statement_info["parameters"]
228 ]
230 def execute_query(self):
231 if self.fetched_data is not None: 231 ↛ 233line 231 didn't jump to line 233 because the condition on line 231 was never true
232 # no need to execute
233 return
235 try:
236 steps = list(self.planner.execute_steps())
237 except PlanningException as e:
238 raise LogicError(e) from e
240 if self.planner.plan.is_resumable: 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was never true
241 # create query
242 if self.query_id is not None:
243 self.run_query = query_context_controller.get_query(self.query_id)
244 else:
245 self.run_query = query_context_controller.create_query(
246 self.context["query_str"], database=self.database
247 )
249 if self.planner.plan.is_async and ctx.task_id is None:
250 # add to task
251 self.run_query.add_to_task()
252 # return query info
253 # columns in upper case
254 rec = {k.upper(): v for k, v in self.run_query.get_info().items()}
255 self.fetched_data = ResultSet.from_df(pd.DataFrame([rec]))
256 self.columns_list = self.fetched_data.columns
257 return
258 self.run_query.mark_as_run()
260 ctx.run_query_id = self.run_query.record.id
262 step_result = None
263 process_mark = None
264 try:
265 steps_classes = (x.__class__ for x in steps)
266 predict_steps = (ApplyPredictorRowStep, ApplyPredictorStep, ApplyTimeseriesPredictorStep)
267 if any(s in predict_steps for s in steps_classes): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true
268 process_mark = create_process_mark("predict")
269 for step in steps:
270 with profiler.Context(f"step: {step.__class__.__name__}"):
271 step_result = self.execute_step(step)
272 self.steps_data[step.step_num] = step_result
273 except Exception as e:
274 if self.run_query is not None: 274 ↛ 276line 274 didn't jump to line 276 because the condition on line 274 was never true
275 # set error and place where it stopped
276 self.run_query.on_error(e, step.step_num, self.steps_data)
277 raise e
278 else:
279 # mark running query as completed
280 if self.run_query is not None: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 self.run_query.finish()
282 ctx.run_query_id = None
283 finally:
284 if process_mark is not None: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 delete_process_mark("predict", process_mark)
287 # save updated query
288 self.query = self.planner.query
290 # there was no executing
291 if len(self.steps_data) == 0: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 return
294 self.fetched_data = step_result
296 try:
297 if hasattr(self, "columns_list") is False: 297 ↛ 299line 297 didn't jump to line 299 because the condition on line 297 was never true
298 # how it becomes False?
299 self.columns_list = self.fetched_data.columns
301 if self.columns_list is None: 301 ↛ 304line 301 didn't jump to line 304 because the condition on line 301 was always true
302 self.columns_list = self.fetched_data.columns
304 for col in self.fetched_data.find_columns("__mindsdb_row_id"): 304 ↛ 305line 304 didn't jump to line 305 because the loop on line 304 never started
305 self.fetched_data.del_column(col)
307 except Exception as e:
308 raise UnknownError("error in column list step") from e
310 def execute_step(self, step, steps_data=None):
311 cls_name = step.__class__.__name__
312 handler = self.step_handlers.get(cls_name)
313 if handler is None: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 raise UnknownError(f"Unknown step: {cls_name}")
316 return handler(self, steps_data=steps_data).call(step)
319SQLQuery.register_steps()