Coverage for mindsdb / migrations / versions / 2022-05-25_d74c189b87e6_predictor_integration.py: 11%
71 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1"""predictor-integration
3Revision ID: d74c189b87e6
4Revises: 27c5aca9e47e
5Create Date: 2022-05-25 15:00:16.284158
7"""
8import json
10from alembic import op
11import sqlalchemy as sa
12from sqlalchemy.sql import text
15# revision identifiers, used by Alembic.
16revision = 'd74c189b87e6'
17down_revision = '27c5aca9e47e'
18branch_labels = None
19depends_on = None
22def upgrade():
23 with op.batch_alter_table('predictor', schema=None) as batch_op:
24 batch_op.add_column(sa.Column('integration_id', sa.Integer(), nullable=True))
25 batch_op.add_column(sa.Column('fetch_data_query', sa.String(), nullable=True))
26 batch_op.create_foreign_key('fk_integration_id', 'integration', ['integration_id'], ['id'])
28 conn = op.get_bind()
30 conn.execute(text('''
31 insert into integration (name, data, company_id, created_at, updated_at)
32 select
33 'files' as name,
34 '{}' as data,
35 company_id,
36 '2022-05-01 00:00:00.000000' as created_at,
37 '2022-05-01 00:00:00.000000' as updated_at
38 from (select distinct company_id from integration) t1
39 '''))
41 predictors = conn.execute(text('''
42 select t1.id, t1.company_id, t2.data
43 from predictor t1 left join dataset t2 on t1.id = t2.id
44 where dataset_id is not null and t2.data is not null
45 ''')).fetchall()
46 for row in predictors:
47 data = row['data']
48 try:
49 data = json.loads(data)
50 except Exception:
51 continue
53 if 'source_type' not in data:
54 continue
55 integration_name = data.get('source_type')
56 if isinstance(integration_name, str) is False or len(integration_name) == 0:
57 continue
58 if integration_name.lower() == 'file':
59 integration_name = 'files'
61 fetch_data_query = data.get('source')
62 if isinstance(fetch_data_query, dict) is False:
63 continue
65 if integration_name == 'files':
66 file_name = fetch_data_query.get('mindsdb_file_name')
67 if isinstance(file_name, str) is False or len(file_name) == 0:
68 continue
69 fetch_data_query = f'select * from {file_name}'
70 else:
71 fetch_data_query = fetch_data_query.get('query')
72 if isinstance(fetch_data_query, str) is False or len(fetch_data_query) == 0:
73 continue
75 query = '''
76 select id
77 from integration
78 where company_id = :company_id and lower(name) = lower(:name)
79 '''
80 if row['company_id'] is None:
81 query = '''
82 select id
83 from integration
84 where company_id is null and lower(name) = lower(:name)
85 '''
86 integration = conn.execute(text(query), {
87 'company_id': row['company_id'],
88 'name': integration_name
89 }).fetchone()
90 if integration is None:
91 continue
93 conn.execute(text('''
94 update predictor
95 set integration_id = :integration_id, fetch_data_query = :fetch_data_query
96 where id = :predictor_id
97 '''), {
98 'integration_id': integration.id,
99 'fetch_data_query': fetch_data_query,
100 'predictor_id': row['id']
101 })
103 with op.batch_alter_table('predictor', schema=None) as batch_op:
104 batch_op.drop_column('dataset_id')
105 with op.batch_alter_table('file', schema=None) as batch_op:
106 batch_op.drop_constraint('unique_file_name_company_id', type_='unique')
107 batch_op.drop_constraint('fk_analysis_id', type_='foreignkey')
108 batch_op.drop_column('analysis_id')
109 with op.batch_alter_table('dataset', schema=None) as batch_op:
110 batch_op.drop_constraint('fk_ds_analysis_id', type_='foreignkey')
111 batch_op.drop_column('analysis_id')
112 op.drop_table('analysis')
113 op.drop_table('dataset')
116def downgrade():
117 with op.batch_alter_table('predictor', schema=None) as batch_op:
118 batch_op.add_column(sa.Column('dataset_id', sa.INTEGER(), nullable=True))
119 batch_op.drop_constraint('fk_integration_id', type_='foreignkey')
120 batch_op.create_foreign_key('fk_predictor_dataset_id', 'dataset', ['dataset_id'], ['id'])
121 batch_op.drop_column('fetch_data_query')
122 batch_op.drop_column('integration_id')
124 op.create_table(
125 'analysis',
126 sa.Column('id', sa.INTEGER(), nullable=False),
127 sa.Column('analysis', sa.VARCHAR(), nullable=False),
128 sa.Column('created_at', sa.DATETIME(), nullable=True),
129 sa.Column('updated_at', sa.DATETIME(), nullable=True),
130 sa.PrimaryKeyConstraint('id')
131 )
133 op.create_table(
134 'dataset',
135 sa.Column('id', sa.INTEGER(), nullable=False),
136 sa.Column('updated_at', sa.DATETIME(), nullable=True),
137 sa.Column('created_at', sa.DATETIME(), nullable=True),
138 sa.Column('name', sa.VARCHAR(), nullable=True),
139 sa.Column('data', sa.VARCHAR(), nullable=True),
140 sa.Column('creation_info', sa.VARCHAR(), nullable=True),
141 sa.Column('company_id', sa.INTEGER(), nullable=True),
142 sa.Column('mindsdb_version', sa.VARCHAR(), nullable=True),
143 sa.Column('datasources_version', sa.VARCHAR(), nullable=True),
144 sa.Column('integration_id', sa.INTEGER(), nullable=True),
145 sa.Column('analysis_id', sa.INTEGER(), nullable=True),
146 sa.Column('ds_class', sa.VARCHAR(), nullable=True),
147 sa.ForeignKeyConstraint(['analysis_id'], ['analysis.id'], name='fk_ds_analysis_id'),
148 sa.ForeignKeyConstraint(['integration_id'], ['integration.id'], name='fk_integration_id'),
149 sa.PrimaryKeyConstraint('id'),
150 sa.UniqueConstraint('name', 'company_id', name='unique_dataset_name_company_id')
151 )
153 with op.batch_alter_table('file', schema=None) as batch_op:
154 batch_op.add_column(sa.Column('analysis_id', sa.INTEGER(), nullable=True))
155 batch_op.create_foreign_key('fk_analysis_id', 'analysis', ['analysis_id'], ['id'])
156 batch_op.create_unique_constraint('unique_file_name_company_id', ['name', 'company_id'])