Coverage for mindsdb / migrations / versions / 2022-02-09_27c5aca9e47e_db_files.py: 10%
98 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1"""db files
3Revision ID: 27c5aca9e47e
4Revises: 47f97b83cee4
5Create Date: 2022-02-09 10:43:29.854671
7"""
8import json
9import datetime
11from alembic import op
12import sqlalchemy as sa
13import mindsdb.interfaces.storage.db
14from sqlalchemy.sql import text
17# revision identifiers, used by Alembic.
18revision = '27c5aca9e47e'
19down_revision = '47f97b83cee4'
20branch_labels = None
21depends_on = None
24def upgrade():
25 op.drop_table('ai_table')
27 conn = op.get_bind()
29 # views was created with unnamed fk. Therefore need recreate it
30 op.create_table(
31 'view_tmp',
32 sa.Column('id', sa.Integer(), nullable=False),
33 sa.Column('name', sa.String(), nullable=False),
34 sa.Column('company_id', sa.Integer(), nullable=True),
35 sa.Column('query', sa.String(), nullable=False),
36 sa.Column('integration_id', sa.Integer(), nullable=False),
37 sa.ForeignKeyConstraint(['integration_id'], ['integration.id'], name='fk_integration_id'),
38 sa.PrimaryKeyConstraint('id'),
39 sa.UniqueConstraint('name', 'company_id', name='unique_view_name_company_id')
40 )
41 conn.execute(text("""
42 insert into view_tmp (id, name, company_id, query, integration_id)
43 select id, name, company_id, query, datasource_id from view;
44 """))
45 op.drop_table('view')
46 op.rename_table('view_tmp', 'view')
48 op.create_table(
49 'analysis',
50 sa.Column('id', sa.Integer(), nullable=False),
51 sa.Column('analysis', mindsdb.interfaces.storage.db.Json(), nullable=False),
52 sa.Column('created_at', sa.DateTime(), nullable=True),
53 sa.Column('updated_at', sa.DateTime(), nullable=True),
54 sa.PrimaryKeyConstraint('id')
55 )
57 with op.batch_alter_table('datasource', schema=None) as batch_op:
58 batch_op.add_column(sa.Column('analysis_id', sa.Integer(), nullable=True))
59 batch_op.create_foreign_key('fk_ds_analysis_id', 'analysis', ['analysis_id'], ['id'])
60 batch_op.add_column(sa.Column('ds_class', sa.String(), nullable=True))
62 session = sa.orm.Session(bind=conn)
63 dsatasources = conn.execute(sa.text('select id, analysis from datasource')).fetchall()
64 for row in dsatasources:
65 if row['analysis'] is not None:
66 # NOTE 'returning' is relatively new in sqlite, so better will be use select after insert.
67 conn.execute(
68 text("""
69 insert into analysis (analysis) select analysis from datasource where id = :id;
70 """), {
71 'id': row['id']
72 }
73 )
74 analysis_id = conn.execute(text("""
75 select id from analysis order by id desc limit 1;
76 """)).fetchall()
77 conn.execute(
78 text("""
79 update datasource set analysis_id = :analysis_id where id = :id
80 """), {
81 'analysis_id': analysis_id[0][0],
82 'id': row['id']
83 }
84 )
86 with op.batch_alter_table('datasource', schema=None) as batch_op:
87 batch_op.drop_column('analysis')
89 op.create_table(
90 'file',
91 sa.Column('id', sa.Integer(), nullable=False),
92 sa.Column('name', sa.String(), nullable=False),
93 sa.Column('company_id', sa.Integer(), nullable=True),
94 sa.Column('source_file_path', sa.String(), nullable=False),
95 sa.Column('file_path', sa.String(), nullable=False),
96 sa.Column('row_count', sa.Integer(), nullable=False),
97 sa.Column('columns', mindsdb.interfaces.storage.db.Json(), nullable=False),
98 sa.Column('created_at', sa.DateTime(), nullable=True, server_default=sa.func.current_timestamp()),
99 sa.Column('updated_at', sa.DateTime(), nullable=True, server_default=sa.func.current_timestamp(), server_onupdate=sa.func.current_timestamp()),
100 sa.Column('analysis_id', sa.Integer(), nullable=True),
101 sa.ForeignKeyConstraint(['analysis_id'], ['analysis.id'], name='fk_analysis_id'),
102 sa.PrimaryKeyConstraint('id'),
103 sa.UniqueConstraint('name', 'company_id', name='unique_file_name_company_id')
104 )
106 # delete ds where data is none
107 dsatasources = conn.execute(text('select * from datasource')).fetchall()
108 for ds in dsatasources:
109 if ds['data'] is None:
110 conn.execute(text('delete from datasource where id = :id'), {'id': ds['id']})
111 continue
112 ds_data = json.loads(ds['data'])
113 creation_info = json.loads(ds['creation_info'])
114 datasource_name = ds_data.get('source_type')
115 if datasource_name == 'file':
116 created_at = None
117 if isinstance(ds['created_at'], str):
118 created_at = datetime.datetime.fromisoformat(ds['created_at'])
119 elif isinstance(ds['created_at'], [float, int]):
120 created_at = datetime.fromtimestamp(ds['created_at'])
122 updated_at = None
123 if isinstance(ds['updated_at'], str):
124 updated_at = datetime.datetime.fromisoformat(ds['updated_at'])
125 elif isinstance(ds['updated_at'], [float, int]):
126 updated_at = datetime.fromtimestamp(ds['updated_at'])
128 file = mindsdb.interfaces.storage.db.File(
129 name=ds['name'],
130 company_id=ds['company_id'],
131 source_file_path=ds_data['source'],
132 file_path=creation_info['args'][0],
133 row_count=ds_data['row_count'],
134 columns=ds_data['columns'],
135 created_at=created_at,
136 updated_at=updated_at,
137 analysis_id=ds['analysis_id']
138 )
139 session.add(file)
140 session.flush()
141 # ds_data['file_id'] = file.id
142 ds_data['source'] = {
143 'mindsdb_file_name': ds['name']
144 # 'source': ds_data['source']
145 }
146 conn.execute(
147 text("""
148 update datasource set data = :ds_data where id = :id;
149 """), {
150 'id': ds['id'],
151 'ds_data': json.dumps(ds_data)
152 }
153 )
155 conn.execute(
156 text("""
157 update datasource
158 set integration_id = (select id from integration where name = :datasource_name and company_id = :company_id),
159 ds_class = :ds_class
160 where id = :id
161 """), {
162 'datasource_name': datasource_name,
163 'company_id': ds['company_id'],
164 'ds_class': creation_info['class'],
165 'id': ds['id']
166 }
167 )
169 session.commit()
171 op.rename_table('datasource', 'dataset')
173 with op.batch_alter_table('dataset', schema=None) as batch_op:
174 batch_op.create_foreign_key('fk_integration_id', 'integration', ['integration_id'], ['id'])
176 # NOTE two different 'batch' is necessary, in other way FK is not creating
177 with op.batch_alter_table('predictor', schema=None) as batch_op:
178 batch_op.alter_column('datasource_id', new_column_name='dataset_id')
179 with op.batch_alter_table('predictor', schema=None) as batch_op:
180 batch_op.create_foreign_key('fk_predictor_dataset_id', 'dataset', ['dataset_id'], ['id'])
181 with op.batch_alter_table('predictor', schema=None) as batch_op:
182 batch_op.create_unique_constraint('unique_predictor_name_company_id', ['name', 'company_id'])
184 with op.batch_alter_table('integration', schema=None) as batch_op:
185 batch_op.create_unique_constraint('unique_integration_name_company_id', ['name', 'company_id'])
187 with op.batch_alter_table('dataset', schema=None) as batch_op:
188 batch_op.create_unique_constraint('unique_dataset_name_company_id', ['name', 'company_id'])
191def downgrade():
192 with op.batch_alter_table('integration', schema=None) as batch_op:
193 batch_op.drop_constraint('unique_integration_name_company_id', type_='unique')
195 with op.batch_alter_table('predictor', schema=None) as batch_op:
196 batch_op.drop_constraint('unique_predictor_name_company_id', type_='unique')
198 with op.batch_alter_table('dataset', schema=None) as batch_op:
199 batch_op.drop_constraint('unique_dataset_name_company_id', type_='unique')
201 with op.batch_alter_table('predictor', schema=None) as batch_op:
202 batch_op.drop_constraint('fk_predictor_dataset_id', type_='foreignkey')
203 batch_op.alter_column('dataset_id', new_column_name='datasource_id')
205 with op.batch_alter_table('dataset', schema=None) as batch_op:
206 batch_op.drop_constraint('fk_integration_id', type_='foreignkey')
207 batch_op.add_column(sa.Column('analysis', sa.VARCHAR(), nullable=True))
208 batch_op.drop_constraint('fk_ds_analysis_id', type_='foreignkey')
209 batch_op.drop_column('ds_class')
211 op.rename_table('dataset', 'datasource')
213 op.drop_table('file')
215 conn = op.get_bind()
216 conn.execute(text("""
217 update datasource set analysis = (select analysis from analysis where id = analysis_id)
218 """))
220 with op.batch_alter_table('datasource', schema=None) as batch_op:
221 batch_op.drop_column('analysis_id')
223 op.drop_table('analysis')
225 op.create_table(
226 'ai_table',
227 sa.Column('id', sa.Integer(), nullable=False),
228 sa.Column('updated_at', sa.DateTime(), nullable=True),
229 sa.Column('created_at', sa.DateTime(), nullable=True),
230 sa.Column('name', sa.String(), nullable=False),
231 sa.Column('integration_name', sa.String(), nullable=False),
232 sa.Column('integration_query', sa.String(), nullable=False),
233 sa.Column('query_fields', mindsdb.interfaces.storage.db.Json(), nullable=False),
234 sa.Column('predictor_name', sa.String(), nullable=False),
235 sa.Column('predictor_columns', mindsdb.interfaces.storage.db.Json(), nullable=False),
236 sa.Column('company_id', sa.Integer(), nullable=True),
237 sa.PrimaryKeyConstraint('id')
238 )
240 op.create_table(
241 'view_tmp',
242 sa.Column('id', sa.Integer(), nullable=False),
243 sa.Column('name', sa.String(), nullable=False),
244 sa.Column('company_id', sa.Integer(), nullable=True),
245 sa.Column('query', sa.String(), nullable=False),
246 sa.Column('datasource_id', sa.Integer(), nullable=False),
247 sa.ForeignKeyConstraint(['datasource_id'], ['integration.id'], name='fk_datasource_id'),
248 sa.PrimaryKeyConstraint('id'),
249 sa.UniqueConstraint('name', 'company_id', name='unique_name_company_id')
250 )
251 conn.execute(text("""
252 insert into view_tmp (id, name, company_id, query, datasource_id)
253 select id, name, company_id, query, integration_id from view;
254 """))
255 op.drop_table('view')
256 op.rename_table('view_tmp', 'view')