Coverage for mindsdb / api / executor / sql_query / steps / join_step.py: 14%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import copy 

2 

3import numpy as np 

4 

5from mindsdb_sql_parser.ast import ( 

6 Identifier, BinaryOperation, Constant 

7) 

8from mindsdb.api.executor.planner.steps import ( 

9 JoinStep, 

10) 

11from mindsdb.integrations.utilities.query_traversal import query_traversal 

12from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

13 

14from mindsdb.api.executor.sql_query.result_set import ResultSet 

15from mindsdb.api.executor.utilities.sql import query_df_with_type_infer_fallback 

16from mindsdb.api.executor.exceptions import NotSupportedYet 

17 

18from .base import BaseStepCall 

19 

20 

21class JoinStepCall(BaseStepCall): 

22 

23 bind = JoinStep 

24 

25 def call(self, step): 

26 left_data = self.steps_data[step.left.step_num] 

27 right_data = self.steps_data[step.right.step_num] 

28 

29 if right_data.is_prediction or left_data.is_prediction: 

30 # ignore join condition, use row_id 

31 l_row_ids = left_data.find_columns('__mindsdb_row_id') 

32 r_row_ids = right_data.find_columns('__mindsdb_row_id') 

33 

34 if len(l_row_ids) == 0: 

35 if len(r_row_ids) == 0: 

36 raise RuntimeError('Unable to find row id') 

37 else: 

38 # copy from right to left 

39 idx = right_data.get_col_index(r_row_ids[0]) 

40 left_data.set_column_values('__mindsdb_row_id', right_data.get_column_values(idx)) 

41 l_row_ids = left_data.find_columns('__mindsdb_row_id') 

42 elif len(r_row_ids) == 0: 

43 # copy from left to right 

44 idx = left_data.get_col_index(l_row_ids[0]) 

45 right_data.set_column_values('__mindsdb_row_id', left_data.get_column_values(idx)) 

46 r_row_ids = right_data.find_columns('__mindsdb_row_id') 

47 

48 a_row_id = l_row_ids[0].get_hash_name(prefix='A') 

49 b_row_id = r_row_ids[0].get_hash_name(prefix='B') 

50 

51 join_condition = f'table_a.{a_row_id} = table_b.{b_row_id}' 

52 

53 join_type = step.query.join_type.lower() 

54 if join_type == 'join': 

55 # join type is not specified. using join to prediction data 

56 if left_data.is_prediction: 

57 join_type = 'left join' 

58 elif right_data.is_prediction: 

59 join_type = 'right join' 

60 else: 

61 def adapt_condition(node, **kwargs): 

62 if not isinstance(node, Identifier) or len(node.parts) != 2: 

63 return 

64 

65 table_alias, alias = node.parts 

66 cols = left_data.find_columns(alias, table_alias) 

67 if len(cols) == 1: 

68 col_name = cols[0].get_hash_name(prefix='A') 

69 return Identifier(parts=['table_a', col_name]) 

70 

71 cols = right_data.find_columns(alias, table_alias) 

72 if len(cols) == 1: 

73 col_name = cols[0].get_hash_name(prefix='B') 

74 return Identifier(parts=['table_b', col_name]) 

75 

76 if step.query.condition is None: 

77 # prevent memory overflow 

78 if len(left_data) * len(right_data) < 10 ** 7: 

79 step.query.condition = BinaryOperation(op='=', args=[Constant(0), Constant(0)]) 

80 else: 

81 raise NotSupportedYet('Unable to join table without condition') 

82 

83 condition = copy.deepcopy(step.query.condition) 

84 query_traversal(condition, adapt_condition) 

85 

86 join_condition = SqlalchemyRender('postgres').get_string(condition) 

87 join_type = step.query.join_type 

88 

89 table_a, names_a = left_data.to_df_cols(prefix='A') 

90 table_b, names_b = right_data.to_df_cols(prefix='B') 

91 

92 query = f""" 

93 SELECT * FROM table_a {join_type} table_b 

94 ON {join_condition} 

95 """ 

96 resp_df, _description = query_df_with_type_infer_fallback(query, { 

97 'table_a': table_a, 

98 'table_b': table_b 

99 }) 

100 

101 resp_df.replace({np.nan: None}, inplace=True) 

102 

103 names_a.update(names_b) 

104 data = ResultSet.from_df_cols(df=resp_df, columns_dict=names_a) 

105 

106 for col in data.find_columns('__mindsdb_row_id'): 

107 data.del_column(col) 

108 

109 return data