Coverage for mindsdb / api / executor / sql_query / steps / union_step.py: 30%

24 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb.api.executor.planner.steps import UnionStep 

2 

3from mindsdb.api.executor.sql_query.result_set import ResultSet 

4from mindsdb.api.executor.exceptions import WrongArgumentError 

5from mindsdb.api.executor.utilities.sql import query_df_with_type_infer_fallback 

6import numpy as np 

7 

8from .base import BaseStepCall 

9 

10 

11class UnionStepCall(BaseStepCall): 

12 bind = UnionStep 

13 

14 def call(self, step): 

15 left_result = self.steps_data[step.left.step_num] 

16 right_result = self.steps_data[step.right.step_num] 

17 

18 # count of columns have to match 

19 if len(left_result.columns) != len(right_result.columns): 

20 raise WrongArgumentError( 

21 f"UNION columns count mismatch: {len(left_result.columns)} != {len(right_result.columns)} " 

22 ) 

23 

24 # types have to match 

25 # TODO: return checking type later 

26 # for i, left_col in enumerate(left_result.columns): 

27 # right_col = right_result.columns[i] 

28 # type1, type2 = left_col.type, right_col.type 

29 # if type1 is not None and type2 is not None: 

30 # if type1 != type2: 

31 # raise ErSqlWrongArguments(f'UNION types mismatch: {type1} != {type2}') 

32 

33 table_a, names = left_result.to_df_cols() 

34 table_b, _ = right_result.to_df_cols() 

35 

36 if step.operation.lower() == "intersect": 

37 op = "INTERSECT" 

38 else: 

39 op = "UNION" 

40 

41 if step.unique is not True: 

42 op += " ALL" 

43 

44 query = f""" 

45 SELECT * FROM table_a 

46 {op} 

47 SELECT * FROM table_b 

48 """ 

49 

50 resp_df, _description = query_df_with_type_infer_fallback(query, {"table_a": table_a, "table_b": table_b}) 

51 resp_df.replace({np.nan: None}, inplace=True) 

52 

53 return ResultSet.from_df_cols(df=resp_df, columns_dict=names)