Coverage for mindsdb / migrations / versions / 2022-11-11_d429095b570f_data_integration_id.py: 16%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1"""data-integration-id 

2 

3Revision ID: d429095b570f 

4Revises: 1e60096fc817 

5Create Date: 2022-11-11 14:00:58.386307 

6 

7""" 

8import json 

9 

10from alembic import op 

11import sqlalchemy as sa 

12from sqlalchemy.sql import text 

13 

14import mindsdb.interfaces.storage.db as db 

15 

16 

17# revision identifiers, used by Alembic. 

18revision = 'd429095b570f' 

19down_revision = '1e60096fc817' 

20branch_labels = None 

21depends_on = None 

22 

23 

24def upgrade(): 

25 with op.batch_alter_table('predictor', schema=None) as batch_op: 

26 batch_op.add_column(sa.Column('data_integration_ref', db.Json(), nullable=True)) 

27 

28 conn = op.get_bind() 

29 session = sa.orm.Session(bind=conn) 

30 

31 view_integration = conn.execute(text(''' 

32 select id from integration where name = 'views' 

33 ''')).fetchone() 

34 if view_integration is not None: 

35 views_integration_id = view_integration['id'] 

36 

37 predictors = conn.execute(text(''' 

38 select id, data_integration_id from predictor 

39 ''')).fetchall() 

40 

41 for predictor in predictors: 

42 data_integration_ref = None 

43 if predictor['data_integration_id'] is not None: 

44 data_integration_ref = {'type': 'integration', 'id': predictor['data_integration_id']} 

45 if predictor['data_integration_id'] == views_integration_id: 

46 data_integration_ref = {'type': 'view'} 

47 if isinstance(data_integration_ref, dict): 

48 data_integration_ref = json.dumps(data_integration_ref) 

49 conn.execute(text(''' 

50 update predictor set data_integration_ref = :data_integration_ref where id = :id 

51 '''), { 

52 'data_integration_ref': data_integration_ref, 

53 'id': predictor['id'] 

54 }) 

55 

56 session.commit() 

57 

58 with op.batch_alter_table('predictor', schema=None) as batch_op: 

59 batch_op.drop_constraint('fk_data_integration_id', type_='foreignkey') 

60 batch_op.drop_column('data_integration_id') 

61 

62 conn.execute(text(''' 

63 delete from integration where name = 'views' 

64 ''')) 

65 session.commit() 

66 

67 

68def downgrade(): 

69 with op.batch_alter_table('predictor', schema=None) as batch_op: 

70 batch_op.add_column(sa.Column('data_integration_id', sa.INTEGER(), nullable=True)) 

71 

72 conn = op.get_bind() 

73 session = sa.orm.Session(bind=conn) 

74 

75 views_integration = db.Integration( 

76 name='views', 

77 data={}, 

78 engine='views', 

79 company_id=None 

80 ) 

81 session.add(views_integration) 

82 session.commit() 

83 

84 predictors = conn.execute(text(''' 

85 select id, data_integration_ref from predictor 

86 ''')).fetchall() 

87 

88 for predictor in predictors: 

89 data_integration_ref = predictor['data_integration_ref'] 

90 if data_integration_ref is None: 

91 continue 

92 data_integration_ref = json.loads(data_integration_ref) 

93 data_integration_id = data_integration_ref.get('id') 

94 if data_integration_ref['type'] == 'view': 

95 data_integration_id = views_integration.id 

96 

97 conn.execute(text(''' 

98 update predictor set data_integration_id = :data_integration_id where id = :id 

99 '''), { 

100 'data_integration_id': data_integration_id, 

101 'id': predictor['id'] 

102 }) 

103 

104 with op.batch_alter_table('predictor', schema=None) as batch_op: 

105 batch_op.create_foreign_key('fk_data_integration_id', 'integration', ['data_integration_id'], ['id']) 

106 batch_op.drop_column('data_integration_ref')