Coverage for mindsdb / migrations / versions / 2022-02-09_27c5aca9e47e_db_files.py: 10%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1"""db files 

2 

3Revision ID: 27c5aca9e47e 

4Revises: 47f97b83cee4 

5Create Date: 2022-02-09 10:43:29.854671 

6 

7""" 

8import json 

9import datetime 

10 

11from alembic import op 

12import sqlalchemy as sa 

13import mindsdb.interfaces.storage.db 

14from sqlalchemy.sql import text 

15 

16 

17# revision identifiers, used by Alembic. 

18revision = '27c5aca9e47e' 

19down_revision = '47f97b83cee4' 

20branch_labels = None 

21depends_on = None 

22 

23 

24def upgrade(): 

25 op.drop_table('ai_table') 

26 

27 conn = op.get_bind() 

28 

29 # views was created with unnamed fk. Therefore need recreate it 

30 op.create_table( 

31 'view_tmp', 

32 sa.Column('id', sa.Integer(), nullable=False), 

33 sa.Column('name', sa.String(), nullable=False), 

34 sa.Column('company_id', sa.Integer(), nullable=True), 

35 sa.Column('query', sa.String(), nullable=False), 

36 sa.Column('integration_id', sa.Integer(), nullable=False), 

37 sa.ForeignKeyConstraint(['integration_id'], ['integration.id'], name='fk_integration_id'), 

38 sa.PrimaryKeyConstraint('id'), 

39 sa.UniqueConstraint('name', 'company_id', name='unique_view_name_company_id') 

40 ) 

41 conn.execute(text(""" 

42 insert into view_tmp (id, name, company_id, query, integration_id) 

43 select id, name, company_id, query, datasource_id from view; 

44 """)) 

45 op.drop_table('view') 

46 op.rename_table('view_tmp', 'view') 

47 

48 op.create_table( 

49 'analysis', 

50 sa.Column('id', sa.Integer(), nullable=False), 

51 sa.Column('analysis', mindsdb.interfaces.storage.db.Json(), nullable=False), 

52 sa.Column('created_at', sa.DateTime(), nullable=True), 

53 sa.Column('updated_at', sa.DateTime(), nullable=True), 

54 sa.PrimaryKeyConstraint('id') 

55 ) 

56 

57 with op.batch_alter_table('datasource', schema=None) as batch_op: 

58 batch_op.add_column(sa.Column('analysis_id', sa.Integer(), nullable=True)) 

59 batch_op.create_foreign_key('fk_ds_analysis_id', 'analysis', ['analysis_id'], ['id']) 

60 batch_op.add_column(sa.Column('ds_class', sa.String(), nullable=True)) 

61 

62 session = sa.orm.Session(bind=conn) 

63 dsatasources = conn.execute(sa.text('select id, analysis from datasource')).fetchall() 

64 for row in dsatasources: 

65 if row['analysis'] is not None: 

66 # NOTE 'returning' is relatively new in sqlite, so better will be use select after insert. 

67 conn.execute( 

68 text(""" 

69 insert into analysis (analysis) select analysis from datasource where id = :id; 

70 """), { 

71 'id': row['id'] 

72 } 

73 ) 

74 analysis_id = conn.execute(text(""" 

75 select id from analysis order by id desc limit 1; 

76 """)).fetchall() 

77 conn.execute( 

78 text(""" 

79 update datasource set analysis_id = :analysis_id where id = :id 

80 """), { 

81 'analysis_id': analysis_id[0][0], 

82 'id': row['id'] 

83 } 

84 ) 

85 

86 with op.batch_alter_table('datasource', schema=None) as batch_op: 

87 batch_op.drop_column('analysis') 

88 

89 op.create_table( 

90 'file', 

91 sa.Column('id', sa.Integer(), nullable=False), 

92 sa.Column('name', sa.String(), nullable=False), 

93 sa.Column('company_id', sa.Integer(), nullable=True), 

94 sa.Column('source_file_path', sa.String(), nullable=False), 

95 sa.Column('file_path', sa.String(), nullable=False), 

96 sa.Column('row_count', sa.Integer(), nullable=False), 

97 sa.Column('columns', mindsdb.interfaces.storage.db.Json(), nullable=False), 

98 sa.Column('created_at', sa.DateTime(), nullable=True, server_default=sa.func.current_timestamp()), 

99 sa.Column('updated_at', sa.DateTime(), nullable=True, server_default=sa.func.current_timestamp(), server_onupdate=sa.func.current_timestamp()), 

100 sa.Column('analysis_id', sa.Integer(), nullable=True), 

101 sa.ForeignKeyConstraint(['analysis_id'], ['analysis.id'], name='fk_analysis_id'), 

102 sa.PrimaryKeyConstraint('id'), 

103 sa.UniqueConstraint('name', 'company_id', name='unique_file_name_company_id') 

104 ) 

105 

106 # delete ds where data is none 

107 dsatasources = conn.execute(text('select * from datasource')).fetchall() 

108 for ds in dsatasources: 

109 if ds['data'] is None: 

110 conn.execute(text('delete from datasource where id = :id'), {'id': ds['id']}) 

111 continue 

112 ds_data = json.loads(ds['data']) 

113 creation_info = json.loads(ds['creation_info']) 

114 datasource_name = ds_data.get('source_type') 

115 if datasource_name == 'file': 

116 created_at = None 

117 if isinstance(ds['created_at'], str): 

118 created_at = datetime.datetime.fromisoformat(ds['created_at']) 

119 elif isinstance(ds['created_at'], [float, int]): 

120 created_at = datetime.fromtimestamp(ds['created_at']) 

121 

122 updated_at = None 

123 if isinstance(ds['updated_at'], str): 

124 updated_at = datetime.datetime.fromisoformat(ds['updated_at']) 

125 elif isinstance(ds['updated_at'], [float, int]): 

126 updated_at = datetime.fromtimestamp(ds['updated_at']) 

127 

128 file = mindsdb.interfaces.storage.db.File( 

129 name=ds['name'], 

130 company_id=ds['company_id'], 

131 source_file_path=ds_data['source'], 

132 file_path=creation_info['args'][0], 

133 row_count=ds_data['row_count'], 

134 columns=ds_data['columns'], 

135 created_at=created_at, 

136 updated_at=updated_at, 

137 analysis_id=ds['analysis_id'] 

138 ) 

139 session.add(file) 

140 session.flush() 

141 # ds_data['file_id'] = file.id 

142 ds_data['source'] = { 

143 'mindsdb_file_name': ds['name'] 

144 # 'source': ds_data['source'] 

145 } 

146 conn.execute( 

147 text(""" 

148 update datasource set data = :ds_data where id = :id; 

149 """), { 

150 'id': ds['id'], 

151 'ds_data': json.dumps(ds_data) 

152 } 

153 ) 

154 

155 conn.execute( 

156 text(""" 

157 update datasource 

158 set integration_id = (select id from integration where name = :datasource_name and company_id = :company_id), 

159 ds_class = :ds_class 

160 where id = :id 

161 """), { 

162 'datasource_name': datasource_name, 

163 'company_id': ds['company_id'], 

164 'ds_class': creation_info['class'], 

165 'id': ds['id'] 

166 } 

167 ) 

168 

169 session.commit() 

170 

171 op.rename_table('datasource', 'dataset') 

172 

173 with op.batch_alter_table('dataset', schema=None) as batch_op: 

174 batch_op.create_foreign_key('fk_integration_id', 'integration', ['integration_id'], ['id']) 

175 

176 # NOTE two different 'batch' is necessary, in other way FK is not creating 

177 with op.batch_alter_table('predictor', schema=None) as batch_op: 

178 batch_op.alter_column('datasource_id', new_column_name='dataset_id') 

179 with op.batch_alter_table('predictor', schema=None) as batch_op: 

180 batch_op.create_foreign_key('fk_predictor_dataset_id', 'dataset', ['dataset_id'], ['id']) 

181 with op.batch_alter_table('predictor', schema=None) as batch_op: 

182 batch_op.create_unique_constraint('unique_predictor_name_company_id', ['name', 'company_id']) 

183 

184 with op.batch_alter_table('integration', schema=None) as batch_op: 

185 batch_op.create_unique_constraint('unique_integration_name_company_id', ['name', 'company_id']) 

186 

187 with op.batch_alter_table('dataset', schema=None) as batch_op: 

188 batch_op.create_unique_constraint('unique_dataset_name_company_id', ['name', 'company_id']) 

189 

190 

191def downgrade(): 

192 with op.batch_alter_table('integration', schema=None) as batch_op: 

193 batch_op.drop_constraint('unique_integration_name_company_id', type_='unique') 

194 

195 with op.batch_alter_table('predictor', schema=None) as batch_op: 

196 batch_op.drop_constraint('unique_predictor_name_company_id', type_='unique') 

197 

198 with op.batch_alter_table('dataset', schema=None) as batch_op: 

199 batch_op.drop_constraint('unique_dataset_name_company_id', type_='unique') 

200 

201 with op.batch_alter_table('predictor', schema=None) as batch_op: 

202 batch_op.drop_constraint('fk_predictor_dataset_id', type_='foreignkey') 

203 batch_op.alter_column('dataset_id', new_column_name='datasource_id') 

204 

205 with op.batch_alter_table('dataset', schema=None) as batch_op: 

206 batch_op.drop_constraint('fk_integration_id', type_='foreignkey') 

207 batch_op.add_column(sa.Column('analysis', sa.VARCHAR(), nullable=True)) 

208 batch_op.drop_constraint('fk_ds_analysis_id', type_='foreignkey') 

209 batch_op.drop_column('ds_class') 

210 

211 op.rename_table('dataset', 'datasource') 

212 

213 op.drop_table('file') 

214 

215 conn = op.get_bind() 

216 conn.execute(text(""" 

217 update datasource set analysis = (select analysis from analysis where id = analysis_id) 

218 """)) 

219 

220 with op.batch_alter_table('datasource', schema=None) as batch_op: 

221 batch_op.drop_column('analysis_id') 

222 

223 op.drop_table('analysis') 

224 

225 op.create_table( 

226 'ai_table', 

227 sa.Column('id', sa.Integer(), nullable=False), 

228 sa.Column('updated_at', sa.DateTime(), nullable=True), 

229 sa.Column('created_at', sa.DateTime(), nullable=True), 

230 sa.Column('name', sa.String(), nullable=False), 

231 sa.Column('integration_name', sa.String(), nullable=False), 

232 sa.Column('integration_query', sa.String(), nullable=False), 

233 sa.Column('query_fields', mindsdb.interfaces.storage.db.Json(), nullable=False), 

234 sa.Column('predictor_name', sa.String(), nullable=False), 

235 sa.Column('predictor_columns', mindsdb.interfaces.storage.db.Json(), nullable=False), 

236 sa.Column('company_id', sa.Integer(), nullable=True), 

237 sa.PrimaryKeyConstraint('id') 

238 ) 

239 

240 op.create_table( 

241 'view_tmp', 

242 sa.Column('id', sa.Integer(), nullable=False), 

243 sa.Column('name', sa.String(), nullable=False), 

244 sa.Column('company_id', sa.Integer(), nullable=True), 

245 sa.Column('query', sa.String(), nullable=False), 

246 sa.Column('datasource_id', sa.Integer(), nullable=False), 

247 sa.ForeignKeyConstraint(['datasource_id'], ['integration.id'], name='fk_datasource_id'), 

248 sa.PrimaryKeyConstraint('id'), 

249 sa.UniqueConstraint('name', 'company_id', name='unique_name_company_id') 

250 ) 

251 conn.execute(text(""" 

252 insert into view_tmp (id, name, company_id, query, datasource_id) 

253 select id, name, company_id, query, integration_id from view; 

254 """)) 

255 op.drop_table('view') 

256 op.rename_table('view_tmp', 'view')