Coverage for mindsdb / api / executor / sql_query / steps / join_step.py: 14%
67 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import copy
3import numpy as np
5from mindsdb_sql_parser.ast import (
6 Identifier, BinaryOperation, Constant
7)
8from mindsdb.api.executor.planner.steps import (
9 JoinStep,
10)
11from mindsdb.integrations.utilities.query_traversal import query_traversal
12from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
14from mindsdb.api.executor.sql_query.result_set import ResultSet
15from mindsdb.api.executor.utilities.sql import query_df_with_type_infer_fallback
16from mindsdb.api.executor.exceptions import NotSupportedYet
18from .base import BaseStepCall
21class JoinStepCall(BaseStepCall):
23 bind = JoinStep
25 def call(self, step):
26 left_data = self.steps_data[step.left.step_num]
27 right_data = self.steps_data[step.right.step_num]
29 if right_data.is_prediction or left_data.is_prediction:
30 # ignore join condition, use row_id
31 l_row_ids = left_data.find_columns('__mindsdb_row_id')
32 r_row_ids = right_data.find_columns('__mindsdb_row_id')
34 if len(l_row_ids) == 0:
35 if len(r_row_ids) == 0:
36 raise RuntimeError('Unable to find row id')
37 else:
38 # copy from right to left
39 idx = right_data.get_col_index(r_row_ids[0])
40 left_data.set_column_values('__mindsdb_row_id', right_data.get_column_values(idx))
41 l_row_ids = left_data.find_columns('__mindsdb_row_id')
42 elif len(r_row_ids) == 0:
43 # copy from left to right
44 idx = left_data.get_col_index(l_row_ids[0])
45 right_data.set_column_values('__mindsdb_row_id', left_data.get_column_values(idx))
46 r_row_ids = right_data.find_columns('__mindsdb_row_id')
48 a_row_id = l_row_ids[0].get_hash_name(prefix='A')
49 b_row_id = r_row_ids[0].get_hash_name(prefix='B')
51 join_condition = f'table_a.{a_row_id} = table_b.{b_row_id}'
53 join_type = step.query.join_type.lower()
54 if join_type == 'join':
55 # join type is not specified. using join to prediction data
56 if left_data.is_prediction:
57 join_type = 'left join'
58 elif right_data.is_prediction:
59 join_type = 'right join'
60 else:
61 def adapt_condition(node, **kwargs):
62 if not isinstance(node, Identifier) or len(node.parts) != 2:
63 return
65 table_alias, alias = node.parts
66 cols = left_data.find_columns(alias, table_alias)
67 if len(cols) == 1:
68 col_name = cols[0].get_hash_name(prefix='A')
69 return Identifier(parts=['table_a', col_name])
71 cols = right_data.find_columns(alias, table_alias)
72 if len(cols) == 1:
73 col_name = cols[0].get_hash_name(prefix='B')
74 return Identifier(parts=['table_b', col_name])
76 if step.query.condition is None:
77 # prevent memory overflow
78 if len(left_data) * len(right_data) < 10 ** 7:
79 step.query.condition = BinaryOperation(op='=', args=[Constant(0), Constant(0)])
80 else:
81 raise NotSupportedYet('Unable to join table without condition')
83 condition = copy.deepcopy(step.query.condition)
84 query_traversal(condition, adapt_condition)
86 join_condition = SqlalchemyRender('postgres').get_string(condition)
87 join_type = step.query.join_type
89 table_a, names_a = left_data.to_df_cols(prefix='A')
90 table_b, names_b = right_data.to_df_cols(prefix='B')
92 query = f"""
93 SELECT * FROM table_a {join_type} table_b
94 ON {join_condition}
95 """
96 resp_df, _description = query_df_with_type_infer_fallback(query, {
97 'table_a': table_a,
98 'table_b': table_b
99 })
101 resp_df.replace({np.nan: None}, inplace=True)
103 names_a.update(names_b)
104 data = ResultSet.from_df_cols(df=resp_df, columns_dict=names_a)
106 for col in data.find_columns('__mindsdb_row_id'):
107 data.del_column(col)
109 return data