Coverage for mindsdb / api / executor / sql_query / steps / fetch_dataframe.py: 50%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser.ast import ( 

2 Identifier, 

3 Constant, 

4 Select, 

5 Join, 

6 Parameter, 

7 BinaryOperation, 

8 Tuple, 

9 Union, 

10 Intersect, 

11) 

12 

13from mindsdb.api.executor.planner.steps import FetchDataframeStep 

14from mindsdb.api.executor.datahub.classes.response import DataHubResponse 

15from mindsdb.api.executor.sql_query.result_set import ResultSet 

16from mindsdb.api.executor.planner.step_result import Result 

17from mindsdb.api.executor.exceptions import UnknownError 

18from mindsdb.integrations.utilities.query_traversal import query_traversal 

19from mindsdb.interfaces.query_context.context_controller import query_context_controller 

20 

21from .base import BaseStepCall 

22 

23 

24def get_table_alias(table_obj, default_db_name): 

25 # (database, table, alias) 

26 if isinstance(table_obj, Identifier): 26 ↛ 31line 26 didn't jump to line 31 because the condition on line 26 was always true

27 if len(table_obj.parts) == 1: 27 ↛ 30line 27 didn't jump to line 30 because the condition on line 27 was always true

28 name = (default_db_name, table_obj.parts[0]) 

29 else: 

30 name = (table_obj.parts[0], table_obj.parts[-1]) 

31 elif isinstance(table_obj, Select): 

32 # it is subquery 

33 if table_obj.alias is None: 

34 name = "t" 

35 else: 

36 name = table_obj.alias.parts[0] 

37 name = (default_db_name, name) 

38 elif isinstance(table_obj, Join): 

39 # get from first table 

40 return get_table_alias(table_obj.left, default_db_name) 

41 else: 

42 # unknown yet object 

43 return default_db_name, "t", "t" 

44 

45 if table_obj.alias is not None: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 name = name + (".".join(table_obj.alias.parts),) 

47 else: 

48 name = name + (name[1],) 

49 return name 

50 

51 

52def get_fill_param_fnc(steps_data): 

53 def fill_params(node, callstack=None, **kwargs): 

54 if not isinstance(node, Parameter): 

55 return 

56 

57 if not isinstance(node.value, Result): 57 ↛ 61line 57 didn't jump to line 61 because the condition on line 57 was always true

58 # is simple parameter and not set 

59 raise ValueError(f"Parameter is not set: {node.value}") 

60 

61 rs = steps_data[node.value.step_num] 

62 items = [Constant(i) for i in rs.get_column_values(col_idx=0)] 

63 

64 is_single_item = True 

65 if callstack: 

66 node_prev = callstack[0] 

67 if isinstance(node_prev, BinaryOperation): 

68 # Check case: 'something IN Parameter()' 

69 if node_prev.op.lower() == "in" and node_prev.args[1] is node: 

70 is_single_item = False 

71 

72 if is_single_item and len(items) == 1: 

73 # extract one value for option 'col=(subselect)' 

74 node = items[0] 

75 else: 

76 node = Tuple(items) 

77 return node 

78 

79 return fill_params 

80 

81 

82class FetchDataframeStepCall(BaseStepCall): 

83 bind = FetchDataframeStep 

84 

85 def call(self, step): 

86 dn = self.session.datahub.get(step.integration) 

87 query = step.query 

88 

89 if dn is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 raise UnknownError(f"Unknown integration name: {step.integration}") 

91 

92 if query is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 table_alias = (self.context.get("database"), "result", "result") 

94 

95 response: DataHubResponse = dn.query(step.raw_query, session=self.session) 

96 df = response.data_frame 

97 else: 

98 if isinstance(step.query, (Union, Intersect)): 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 table_alias = ["", "", ""] 

100 else: 

101 table_alias = get_table_alias(step.query.from_table, self.context.get("database")) 

102 

103 # TODO for information_schema we have 'database' = 'mindsdb' 

104 

105 # fill params 

106 fill_params = get_fill_param_fnc(self.steps_data) 

107 query_traversal(query, fill_params) 

108 

109 query, context_callback = query_context_controller.handle_db_context_vars(query, dn, self.session) 

110 

111 response: DataHubResponse = dn.query(query=query, session=self.session) 

112 df = response.data_frame 

113 

114 if context_callback: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 context_callback(df, response.columns) 

116 

117 # if query registered, set progress 

118 if self.sql_query.run_query is not None: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 self.sql_query.run_query.set_progress(processed_rows=len(df)) 

120 return ResultSet.from_df( 

121 df, 

122 table_name=table_alias[1], 

123 table_alias=table_alias[2], 

124 database=table_alias[0], 

125 mysql_types=response.mysql_types, 

126 )