Coverage for mindsdb / migrations / versions / 2022-11-11_d429095b570f_data_integration_id.py: 16%
54 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1"""data-integration-id
3Revision ID: d429095b570f
4Revises: 1e60096fc817
5Create Date: 2022-11-11 14:00:58.386307
7"""
8import json
10from alembic import op
11import sqlalchemy as sa
12from sqlalchemy.sql import text
14import mindsdb.interfaces.storage.db as db
17# revision identifiers, used by Alembic.
18revision = 'd429095b570f'
19down_revision = '1e60096fc817'
20branch_labels = None
21depends_on = None
24def upgrade():
25 with op.batch_alter_table('predictor', schema=None) as batch_op:
26 batch_op.add_column(sa.Column('data_integration_ref', db.Json(), nullable=True))
28 conn = op.get_bind()
29 session = sa.orm.Session(bind=conn)
31 view_integration = conn.execute(text('''
32 select id from integration where name = 'views'
33 ''')).fetchone()
34 if view_integration is not None:
35 views_integration_id = view_integration['id']
37 predictors = conn.execute(text('''
38 select id, data_integration_id from predictor
39 ''')).fetchall()
41 for predictor in predictors:
42 data_integration_ref = None
43 if predictor['data_integration_id'] is not None:
44 data_integration_ref = {'type': 'integration', 'id': predictor['data_integration_id']}
45 if predictor['data_integration_id'] == views_integration_id:
46 data_integration_ref = {'type': 'view'}
47 if isinstance(data_integration_ref, dict):
48 data_integration_ref = json.dumps(data_integration_ref)
49 conn.execute(text('''
50 update predictor set data_integration_ref = :data_integration_ref where id = :id
51 '''), {
52 'data_integration_ref': data_integration_ref,
53 'id': predictor['id']
54 })
56 session.commit()
58 with op.batch_alter_table('predictor', schema=None) as batch_op:
59 batch_op.drop_constraint('fk_data_integration_id', type_='foreignkey')
60 batch_op.drop_column('data_integration_id')
62 conn.execute(text('''
63 delete from integration where name = 'views'
64 '''))
65 session.commit()
68def downgrade():
69 with op.batch_alter_table('predictor', schema=None) as batch_op:
70 batch_op.add_column(sa.Column('data_integration_id', sa.INTEGER(), nullable=True))
72 conn = op.get_bind()
73 session = sa.orm.Session(bind=conn)
75 views_integration = db.Integration(
76 name='views',
77 data={},
78 engine='views',
79 company_id=None
80 )
81 session.add(views_integration)
82 session.commit()
84 predictors = conn.execute(text('''
85 select id, data_integration_ref from predictor
86 ''')).fetchall()
88 for predictor in predictors:
89 data_integration_ref = predictor['data_integration_ref']
90 if data_integration_ref is None:
91 continue
92 data_integration_ref = json.loads(data_integration_ref)
93 data_integration_id = data_integration_ref.get('id')
94 if data_integration_ref['type'] == 'view':
95 data_integration_id = views_integration.id
97 conn.execute(text('''
98 update predictor set data_integration_id = :data_integration_id where id = :id
99 '''), {
100 'data_integration_id': data_integration_id,
101 'id': predictor['id']
102 })
104 with op.batch_alter_table('predictor', schema=None) as batch_op:
105 batch_op.create_foreign_key('fk_data_integration_id', 'integration', ['data_integration_id'], ['id'])
106 batch_op.drop_column('data_integration_ref')