Coverage for mindsdb / migrations / versions / 2022-05-25_d74c189b87e6_predictor_integration.py: 11%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1"""predictor-integration 

2 

3Revision ID: d74c189b87e6 

4Revises: 27c5aca9e47e 

5Create Date: 2022-05-25 15:00:16.284158 

6 

7""" 

8import json 

9 

10from alembic import op 

11import sqlalchemy as sa 

12from sqlalchemy.sql import text 

13 

14 

15# revision identifiers, used by Alembic. 

16revision = 'd74c189b87e6' 

17down_revision = '27c5aca9e47e' 

18branch_labels = None 

19depends_on = None 

20 

21 

22def upgrade(): 

23 with op.batch_alter_table('predictor', schema=None) as batch_op: 

24 batch_op.add_column(sa.Column('integration_id', sa.Integer(), nullable=True)) 

25 batch_op.add_column(sa.Column('fetch_data_query', sa.String(), nullable=True)) 

26 batch_op.create_foreign_key('fk_integration_id', 'integration', ['integration_id'], ['id']) 

27 

28 conn = op.get_bind() 

29 

30 conn.execute(text(''' 

31 insert into integration (name, data, company_id, created_at, updated_at) 

32 select 

33 'files' as name, 

34 '{}' as data, 

35 company_id, 

36 '2022-05-01 00:00:00.000000' as created_at, 

37 '2022-05-01 00:00:00.000000' as updated_at 

38 from (select distinct company_id from integration) t1 

39 ''')) 

40 

41 predictors = conn.execute(text(''' 

42 select t1.id, t1.company_id, t2.data 

43 from predictor t1 left join dataset t2 on t1.id = t2.id 

44 where dataset_id is not null and t2.data is not null 

45 ''')).fetchall() 

46 for row in predictors: 

47 data = row['data'] 

48 try: 

49 data = json.loads(data) 

50 except Exception: 

51 continue 

52 

53 if 'source_type' not in data: 

54 continue 

55 integration_name = data.get('source_type') 

56 if isinstance(integration_name, str) is False or len(integration_name) == 0: 

57 continue 

58 if integration_name.lower() == 'file': 

59 integration_name = 'files' 

60 

61 fetch_data_query = data.get('source') 

62 if isinstance(fetch_data_query, dict) is False: 

63 continue 

64 

65 if integration_name == 'files': 

66 file_name = fetch_data_query.get('mindsdb_file_name') 

67 if isinstance(file_name, str) is False or len(file_name) == 0: 

68 continue 

69 fetch_data_query = f'select * from {file_name}' 

70 else: 

71 fetch_data_query = fetch_data_query.get('query') 

72 if isinstance(fetch_data_query, str) is False or len(fetch_data_query) == 0: 

73 continue 

74 

75 query = ''' 

76 select id 

77 from integration 

78 where company_id = :company_id and lower(name) = lower(:name) 

79 ''' 

80 if row['company_id'] is None: 

81 query = ''' 

82 select id 

83 from integration 

84 where company_id is null and lower(name) = lower(:name) 

85 ''' 

86 integration = conn.execute(text(query), { 

87 'company_id': row['company_id'], 

88 'name': integration_name 

89 }).fetchone() 

90 if integration is None: 

91 continue 

92 

93 conn.execute(text(''' 

94 update predictor 

95 set integration_id = :integration_id, fetch_data_query = :fetch_data_query 

96 where id = :predictor_id 

97 '''), { 

98 'integration_id': integration.id, 

99 'fetch_data_query': fetch_data_query, 

100 'predictor_id': row['id'] 

101 }) 

102 

103 with op.batch_alter_table('predictor', schema=None) as batch_op: 

104 batch_op.drop_column('dataset_id') 

105 with op.batch_alter_table('file', schema=None) as batch_op: 

106 batch_op.drop_constraint('unique_file_name_company_id', type_='unique') 

107 batch_op.drop_constraint('fk_analysis_id', type_='foreignkey') 

108 batch_op.drop_column('analysis_id') 

109 with op.batch_alter_table('dataset', schema=None) as batch_op: 

110 batch_op.drop_constraint('fk_ds_analysis_id', type_='foreignkey') 

111 batch_op.drop_column('analysis_id') 

112 op.drop_table('analysis') 

113 op.drop_table('dataset') 

114 

115 

116def downgrade(): 

117 with op.batch_alter_table('predictor', schema=None) as batch_op: 

118 batch_op.add_column(sa.Column('dataset_id', sa.INTEGER(), nullable=True)) 

119 batch_op.drop_constraint('fk_integration_id', type_='foreignkey') 

120 batch_op.create_foreign_key('fk_predictor_dataset_id', 'dataset', ['dataset_id'], ['id']) 

121 batch_op.drop_column('fetch_data_query') 

122 batch_op.drop_column('integration_id') 

123 

124 op.create_table( 

125 'analysis', 

126 sa.Column('id', sa.INTEGER(), nullable=False), 

127 sa.Column('analysis', sa.VARCHAR(), nullable=False), 

128 sa.Column('created_at', sa.DATETIME(), nullable=True), 

129 sa.Column('updated_at', sa.DATETIME(), nullable=True), 

130 sa.PrimaryKeyConstraint('id') 

131 ) 

132 

133 op.create_table( 

134 'dataset', 

135 sa.Column('id', sa.INTEGER(), nullable=False), 

136 sa.Column('updated_at', sa.DATETIME(), nullable=True), 

137 sa.Column('created_at', sa.DATETIME(), nullable=True), 

138 sa.Column('name', sa.VARCHAR(), nullable=True), 

139 sa.Column('data', sa.VARCHAR(), nullable=True), 

140 sa.Column('creation_info', sa.VARCHAR(), nullable=True), 

141 sa.Column('company_id', sa.INTEGER(), nullable=True), 

142 sa.Column('mindsdb_version', sa.VARCHAR(), nullable=True), 

143 sa.Column('datasources_version', sa.VARCHAR(), nullable=True), 

144 sa.Column('integration_id', sa.INTEGER(), nullable=True), 

145 sa.Column('analysis_id', sa.INTEGER(), nullable=True), 

146 sa.Column('ds_class', sa.VARCHAR(), nullable=True), 

147 sa.ForeignKeyConstraint(['analysis_id'], ['analysis.id'], name='fk_ds_analysis_id'), 

148 sa.ForeignKeyConstraint(['integration_id'], ['integration.id'], name='fk_integration_id'), 

149 sa.PrimaryKeyConstraint('id'), 

150 sa.UniqueConstraint('name', 'company_id', name='unique_dataset_name_company_id') 

151 ) 

152 

153 with op.batch_alter_table('file', schema=None) as batch_op: 

154 batch_op.add_column(sa.Column('analysis_id', sa.INTEGER(), nullable=True)) 

155 batch_op.create_foreign_key('fk_analysis_id', 'analysis', ['analysis_id'], ['id']) 

156 batch_op.create_unique_constraint('unique_file_name_company_id', ['name', 'company_id'])