Coverage for mindsdb / api / executor / sql_query / steps / union_step.py: 30%
24 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb.api.executor.planner.steps import UnionStep
3from mindsdb.api.executor.sql_query.result_set import ResultSet
4from mindsdb.api.executor.exceptions import WrongArgumentError
5from mindsdb.api.executor.utilities.sql import query_df_with_type_infer_fallback
6import numpy as np
8from .base import BaseStepCall
11class UnionStepCall(BaseStepCall):
12 bind = UnionStep
14 def call(self, step):
15 left_result = self.steps_data[step.left.step_num]
16 right_result = self.steps_data[step.right.step_num]
18 # count of columns have to match
19 if len(left_result.columns) != len(right_result.columns):
20 raise WrongArgumentError(
21 f"UNION columns count mismatch: {len(left_result.columns)} != {len(right_result.columns)} "
22 )
24 # types have to match
25 # TODO: return checking type later
26 # for i, left_col in enumerate(left_result.columns):
27 # right_col = right_result.columns[i]
28 # type1, type2 = left_col.type, right_col.type
29 # if type1 is not None and type2 is not None:
30 # if type1 != type2:
31 # raise ErSqlWrongArguments(f'UNION types mismatch: {type1} != {type2}')
33 table_a, names = left_result.to_df_cols()
34 table_b, _ = right_result.to_df_cols()
36 if step.operation.lower() == "intersect":
37 op = "INTERSECT"
38 else:
39 op = "UNION"
41 if step.unique is not True:
42 op += " ALL"
44 query = f"""
45 SELECT * FROM table_a
46 {op}
47 SELECT * FROM table_b
48 """
50 resp_df, _description = query_df_with_type_infer_fallback(query, {"table_a": table_a, "table_b": table_b})
51 resp_df.replace({np.nan: None}, inplace=True)
53 return ResultSet.from_df_cols(df=resp_df, columns_dict=names)