Coverage for mindsdb / api / executor / sql_query / steps / insert_step.py: 40%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser.ast import ( 

2 Identifier, 

3) 

4 

5from mindsdb.api.executor.planner.steps import SaveToTable, InsertToTable, CreateTableStep 

6from mindsdb.api.executor.sql_query.result_set import ResultSet, Column 

7from mindsdb.utilities.exception import EntityNotExistsError 

8from mindsdb.api.executor.exceptions import NotSupportedYet, LogicError 

9from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES 

10 

11from .base import BaseStepCall 

12 

13 

14class InsertToTableCall(BaseStepCall): 

15 bind = InsertToTable 

16 

17 def call(self, step): 

18 is_replace = False 

19 is_create = False 

20 

21 if type(step) == SaveToTable: 21 ↛ 27line 21 didn't jump to line 27 because the condition on line 21 was always true

22 is_create = True 

23 

24 if step.is_replace: 

25 is_replace = True 

26 

27 if len(step.table.parts) > 1: 27 ↛ 31line 27 didn't jump to line 31 because the condition on line 27 was always true

28 integration_name = step.table.parts[0] 

29 table_name = Identifier(parts=step.table.parts[1:]) 

30 else: 

31 integration_name = self.context["database"] 

32 table_name = step.table 

33 

34 dn = self.session.datahub.get(integration_name) 

35 

36 if hasattr(dn, "create_table") is False: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 raise NotSupportedYet(f"Creating table in '{integration_name}' is not supported") 

38 

39 if step.dataframe is not None: 39 ↛ 41line 39 didn't jump to line 41 because the condition on line 39 was always true

40 data = self.steps_data[step.dataframe.step_num] 

41 elif step.query is not None: 

42 data = ResultSet() 

43 if step.query.columns is None: 

44 # Is query like: INSERT INTO table VALUES (...) 

45 table_columns_df = dn.get_table_columns_df(str(table_name)) 

46 columns_names = table_columns_df[INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME].to_list() 

47 for column_name in columns_names: 

48 data.add_column(Column(name=column_name)) 

49 else: 

50 # Is query like: INSERT INTO table (column_name, ...) VALUES (...) 

51 for col in step.query.columns: 

52 data.add_column(Column(name=col.name)) 

53 

54 records = [] 

55 for row in step.query.values: 

56 record = [] 

57 for v in row: 

58 if isinstance(v, Identifier) and v.parts[0] == "None": 

59 # Allow explicitly inserting NULL values. 

60 record.append(None) 

61 continue 

62 # Value is a constant 

63 record.append(v.value) 

64 records.append(record) 

65 

66 data.add_raw_values(records) 

67 else: 

68 raise LogicError(f"Data not found for insert: {step}") 

69 

70 # del 'service' columns 

71 for col in data.find_columns("__mindsdb_row_id"): 71 ↛ 72line 71 didn't jump to line 72 because the loop on line 71 never started

72 data.del_column(col) 

73 for col in data.find_columns("__mdb_forecast_offset"): 73 ↛ 74line 73 didn't jump to line 74 because the loop on line 73 never started

74 data.del_column(col) 

75 

76 # region del columns filtered at projection step 

77 columns_list = self.get_columns_list() 

78 if columns_list is not None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 filtered_column_names = [x.name for x in columns_list] 

80 for col in data.columns: 

81 if col.name.startswith("predictor."): 

82 continue 

83 if col.name in filtered_column_names: 

84 continue 

85 data.del_column(col) 

86 # endregion 

87 

88 # drop double names 

89 col_names = set() 

90 for col in data.columns: 

91 if col.alias in col_names: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 data.del_column(col) 

93 else: 

94 col_names.add(col.alias) 

95 

96 response = dn.create_table( 

97 table_name=table_name, result_set=data, is_replace=is_replace, is_create=is_create, params=step.params 

98 ) 

99 return ResultSet(affected_rows=response.affected_rows) 

100 

101 

102class SaveToTableCall(InsertToTableCall): 

103 bind = SaveToTable 

104 

105 

106class CreateTableCall(BaseStepCall): 

107 bind = CreateTableStep 

108 

109 def call(self, step): 

110 if len(step.table.parts) > 1: 

111 integration_name = step.table.parts[0] 

112 table_name = Identifier(parts=step.table.parts[1:]) 

113 else: 

114 integration_name = self.context["database"] 

115 table_name = step.table 

116 

117 dn = self.session.datahub.get(integration_name) 

118 if dn is None: 

119 raise EntityNotExistsError("Database not found", integration_name) 

120 

121 dn.create_table(table_name=table_name, columns=step.columns, is_replace=step.is_replace, is_create=True) 

122 return ResultSet()