Coverage for mindsdb / migrations / versions / 2022-07-08_999bceb904df_integration_args.py: 14%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1"""integration-args 

2 

3Revision ID: 999bceb904df 

4Revises: d74c189b87e6 

5Create Date: 2022-07-08 10:58:19.822618 

6 

7""" 

8import json 

9 

10from alembic import op 

11import sqlalchemy as sa 

12from sqlalchemy.sql import text 

13 

14import mindsdb.interfaces.storage.db # noqa 

15 

16 

17# revision identifiers, used by Alembic. 

18revision = '999bceb904df' 

19down_revision = 'd74c189b87e6' 

20branch_labels = None 

21depends_on = None 

22 

23 

24def upgrade(): 

25 conn = op.get_bind() 

26 session = sa.orm.Session(bind=conn) 

27 

28 with op.batch_alter_table('integration', schema=None) as batch_op: 

29 batch_op.add_column(sa.Column('engine', sa.String())) 

30 

31 integrations = conn.execute(text(''' 

32 select id, name, data from integration 

33 ''')).fetchall() 

34 

35 for row in integrations: 

36 try: 

37 data = json.loads(row['data']) 

38 except Exception: 

39 data = {} 

40 

41 if 'test' in data: 

42 del data['test'] 

43 if 'publish' in data: 

44 del data['publish'] 

45 if 'enabled' in data: 

46 del data['enabled'] 

47 if 'database_name' in data: 

48 if row['name'] is None: 

49 row['name'] = data['database_name'] 

50 del data['database_name'] 

51 integration_type = data.get('type') 

52 if integration_type is None: 

53 if row['name'] == 'files': 

54 integration_type = 'files' 

55 if row['name'] == 'views': 

56 integration_type = 'views' 

57 if 'type' in data: 

58 del data['type'] 

59 

60 conn.execute( 

61 text(""" 

62 update integration 

63 set engine = :integration_type, 

64 data = :integration_data 

65 where id = :integration_id 

66 """), { 

67 'integration_type': integration_type, 

68 'integration_data': json.dumps(data), 

69 'integration_id': row['id'] 

70 } 

71 ) 

72 

73 session.commit() 

74 

75 

76def downgrade(): 

77 conn = op.get_bind() 

78 session = sa.orm.Session(bind=conn) 

79 

80 integrations = conn.execute(sa.text(''' 

81 select id, name, type, data from integration 

82 ''')).fetchall() 

83 

84 for row in integrations: 

85 try: 

86 data = json.loads(row['data']) 

87 except Exception: 

88 data = {} 

89 if row['engine'] is not None: 

90 data['type'] = row['engine'] 

91 

92 conn.execute( 

93 text(""" 

94 update integration 

95 set data = :integration_data 

96 where id = :integration_id 

97 """), { 

98 'integration_data': json.dumps(data), 

99 'integration_id': row['id'] 

100 } 

101 ) 

102 

103 with op.batch_alter_table('integration', schema=None) as batch_op: 

104 batch_op.drop_column('engine') 

105 

106 session.commit()