Coverage for mindsdb / api / executor / sql_query / steps / fetch_dataframe.py: 50%
69 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb_sql_parser.ast import (
2 Identifier,
3 Constant,
4 Select,
5 Join,
6 Parameter,
7 BinaryOperation,
8 Tuple,
9 Union,
10 Intersect,
11)
13from mindsdb.api.executor.planner.steps import FetchDataframeStep
14from mindsdb.api.executor.datahub.classes.response import DataHubResponse
15from mindsdb.api.executor.sql_query.result_set import ResultSet
16from mindsdb.api.executor.planner.step_result import Result
17from mindsdb.api.executor.exceptions import UnknownError
18from mindsdb.integrations.utilities.query_traversal import query_traversal
19from mindsdb.interfaces.query_context.context_controller import query_context_controller
21from .base import BaseStepCall
24def get_table_alias(table_obj, default_db_name):
25 # (database, table, alias)
26 if isinstance(table_obj, Identifier): 26 ↛ 31line 26 didn't jump to line 31 because the condition on line 26 was always true
27 if len(table_obj.parts) == 1: 27 ↛ 30line 27 didn't jump to line 30 because the condition on line 27 was always true
28 name = (default_db_name, table_obj.parts[0])
29 else:
30 name = (table_obj.parts[0], table_obj.parts[-1])
31 elif isinstance(table_obj, Select):
32 # it is subquery
33 if table_obj.alias is None:
34 name = "t"
35 else:
36 name = table_obj.alias.parts[0]
37 name = (default_db_name, name)
38 elif isinstance(table_obj, Join):
39 # get from first table
40 return get_table_alias(table_obj.left, default_db_name)
41 else:
42 # unknown yet object
43 return default_db_name, "t", "t"
45 if table_obj.alias is not None: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 name = name + (".".join(table_obj.alias.parts),)
47 else:
48 name = name + (name[1],)
49 return name
52def get_fill_param_fnc(steps_data):
53 def fill_params(node, callstack=None, **kwargs):
54 if not isinstance(node, Parameter):
55 return
57 if not isinstance(node.value, Result): 57 ↛ 61line 57 didn't jump to line 61 because the condition on line 57 was always true
58 # is simple parameter and not set
59 raise ValueError(f"Parameter is not set: {node.value}")
61 rs = steps_data[node.value.step_num]
62 items = [Constant(i) for i in rs.get_column_values(col_idx=0)]
64 is_single_item = True
65 if callstack:
66 node_prev = callstack[0]
67 if isinstance(node_prev, BinaryOperation):
68 # Check case: 'something IN Parameter()'
69 if node_prev.op.lower() == "in" and node_prev.args[1] is node:
70 is_single_item = False
72 if is_single_item and len(items) == 1:
73 # extract one value for option 'col=(subselect)'
74 node = items[0]
75 else:
76 node = Tuple(items)
77 return node
79 return fill_params
82class FetchDataframeStepCall(BaseStepCall):
83 bind = FetchDataframeStep
85 def call(self, step):
86 dn = self.session.datahub.get(step.integration)
87 query = step.query
89 if dn is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 raise UnknownError(f"Unknown integration name: {step.integration}")
92 if query is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 table_alias = (self.context.get("database"), "result", "result")
95 response: DataHubResponse = dn.query(step.raw_query, session=self.session)
96 df = response.data_frame
97 else:
98 if isinstance(step.query, (Union, Intersect)): 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 table_alias = ["", "", ""]
100 else:
101 table_alias = get_table_alias(step.query.from_table, self.context.get("database"))
103 # TODO for information_schema we have 'database' = 'mindsdb'
105 # fill params
106 fill_params = get_fill_param_fnc(self.steps_data)
107 query_traversal(query, fill_params)
109 query, context_callback = query_context_controller.handle_db_context_vars(query, dn, self.session)
111 response: DataHubResponse = dn.query(query=query, session=self.session)
112 df = response.data_frame
114 if context_callback: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 context_callback(df, response.columns)
117 # if query registered, set progress
118 if self.sql_query.run_query is not None: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 self.sql_query.run_query.set_progress(processed_rows=len(df))
120 return ResultSet.from_df(
121 df,
122 table_name=table_alias[1],
123 table_alias=table_alias[2],
124 database=table_alias[0],
125 mysql_types=response.mysql_types,
126 )