Coverage for mindsdb / api / executor / sql_query / steps / insert_step.py: 40%
81 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb_sql_parser.ast import (
2 Identifier,
3)
5from mindsdb.api.executor.planner.steps import SaveToTable, InsertToTable, CreateTableStep
6from mindsdb.api.executor.sql_query.result_set import ResultSet, Column
7from mindsdb.utilities.exception import EntityNotExistsError
8from mindsdb.api.executor.exceptions import NotSupportedYet, LogicError
9from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES
11from .base import BaseStepCall
14class InsertToTableCall(BaseStepCall):
15 bind = InsertToTable
17 def call(self, step):
18 is_replace = False
19 is_create = False
21 if type(step) == SaveToTable: 21 ↛ 27line 21 didn't jump to line 27 because the condition on line 21 was always true
22 is_create = True
24 if step.is_replace:
25 is_replace = True
27 if len(step.table.parts) > 1: 27 ↛ 31line 27 didn't jump to line 31 because the condition on line 27 was always true
28 integration_name = step.table.parts[0]
29 table_name = Identifier(parts=step.table.parts[1:])
30 else:
31 integration_name = self.context["database"]
32 table_name = step.table
34 dn = self.session.datahub.get(integration_name)
36 if hasattr(dn, "create_table") is False: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 raise NotSupportedYet(f"Creating table in '{integration_name}' is not supported")
39 if step.dataframe is not None: 39 ↛ 41line 39 didn't jump to line 41 because the condition on line 39 was always true
40 data = self.steps_data[step.dataframe.step_num]
41 elif step.query is not None:
42 data = ResultSet()
43 if step.query.columns is None:
44 # Is query like: INSERT INTO table VALUES (...)
45 table_columns_df = dn.get_table_columns_df(str(table_name))
46 columns_names = table_columns_df[INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME].to_list()
47 for column_name in columns_names:
48 data.add_column(Column(name=column_name))
49 else:
50 # Is query like: INSERT INTO table (column_name, ...) VALUES (...)
51 for col in step.query.columns:
52 data.add_column(Column(name=col.name))
54 records = []
55 for row in step.query.values:
56 record = []
57 for v in row:
58 if isinstance(v, Identifier) and v.parts[0] == "None":
59 # Allow explicitly inserting NULL values.
60 record.append(None)
61 continue
62 # Value is a constant
63 record.append(v.value)
64 records.append(record)
66 data.add_raw_values(records)
67 else:
68 raise LogicError(f"Data not found for insert: {step}")
70 # del 'service' columns
71 for col in data.find_columns("__mindsdb_row_id"): 71 ↛ 72line 71 didn't jump to line 72 because the loop on line 71 never started
72 data.del_column(col)
73 for col in data.find_columns("__mdb_forecast_offset"): 73 ↛ 74line 73 didn't jump to line 74 because the loop on line 73 never started
74 data.del_column(col)
76 # region del columns filtered at projection step
77 columns_list = self.get_columns_list()
78 if columns_list is not None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 filtered_column_names = [x.name for x in columns_list]
80 for col in data.columns:
81 if col.name.startswith("predictor."):
82 continue
83 if col.name in filtered_column_names:
84 continue
85 data.del_column(col)
86 # endregion
88 # drop double names
89 col_names = set()
90 for col in data.columns:
91 if col.alias in col_names: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 data.del_column(col)
93 else:
94 col_names.add(col.alias)
96 response = dn.create_table(
97 table_name=table_name, result_set=data, is_replace=is_replace, is_create=is_create, params=step.params
98 )
99 return ResultSet(affected_rows=response.affected_rows)
102class SaveToTableCall(InsertToTableCall):
103 bind = SaveToTable
106class CreateTableCall(BaseStepCall):
107 bind = CreateTableStep
109 def call(self, step):
110 if len(step.table.parts) > 1:
111 integration_name = step.table.parts[0]
112 table_name = Identifier(parts=step.table.parts[1:])
113 else:
114 integration_name = self.context["database"]
115 table_name = step.table
117 dn = self.session.datahub.get(integration_name)
118 if dn is None:
119 raise EntityNotExistsError("Database not found", integration_name)
121 dn.create_table(table_name=table_name, columns=step.columns, is_replace=step.is_replace, is_create=True)
122 return ResultSet()