Coverage for mindsdb / interfaces / storage / json.py: 52%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb.utilities.functions import decrypt_json, encrypt_json 

2from mindsdb.utilities.config import config 

3from mindsdb.interfaces.storage import db 

4from mindsdb.interfaces.storage.fs import RESOURCE_GROUP 

5from mindsdb.utilities.context import context as ctx 

6from mindsdb.utilities import log 

7 

8logger = log.getLogger(__name__) 

9 

10 

11class JsonStorage: 

12 def __init__(self, resource_group: str, resource_id: int): 

13 self.resource_group = resource_group 

14 self.resource_id = resource_id 

15 

16 def __setitem__(self, key, value): 

17 if isinstance(value, dict) is False: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true

18 raise TypeError(f"got {type(value)} instead of dict") 

19 existing_record = self.get_record(key) 

20 if existing_record is None: 20 ↛ 30line 20 didn't jump to line 30 because the condition on line 20 was always true

21 record = db.JsonStorage( 

22 name=key, 

23 resource_group=self.resource_group, 

24 resource_id=self.resource_id, 

25 company_id=ctx.company_id, 

26 content=value, 

27 ) 

28 db.session.add(record) 

29 else: 

30 existing_record.content = value 

31 db.session.commit() 

32 

33 def set(self, key, value): 

34 self[key] = value 

35 

36 def __getitem__(self, key): 

37 record = self.get_record(key) 

38 if record is None: 

39 return None 

40 return record.content 

41 

42 def get(self, key): 

43 return self[key] 

44 

45 def get_record(self, key): 

46 record = ( 

47 db.session.query(db.JsonStorage) 

48 .filter_by( 

49 name=key, resource_group=self.resource_group, resource_id=self.resource_id, company_id=ctx.company_id 

50 ) 

51 .first() 

52 ) 

53 return record 

54 

55 def get_all_records(self): 

56 records = ( 

57 db.session.query(db.JsonStorage) 

58 .filter_by(resource_group=self.resource_group, resource_id=self.resource_id, company_id=ctx.company_id) 

59 .all() 

60 ) 

61 return records 

62 

63 def __repr__(self): 

64 records = self.get_all_records() 

65 names = [x.name for x in records] 

66 return f"json_storage({names})" 

67 

68 def __len__(self): 

69 records = self.get_all_records() 

70 return len(records) 

71 

72 def __delitem__(self, key): 

73 record = self.get_record(key) 

74 if record is not None: 

75 try: 

76 db.session.delete(record) 

77 db.session.commit() 

78 except Exception: 

79 db.session.rollback() 

80 logger.exception("cant delete record from JSON storage:") 

81 

82 def delete(self, key): 

83 del self[key] 

84 

85 def clean(self): 

86 json_records = self.get_all_records() 

87 for record in json_records: 

88 db.session.delete(record) 

89 try: 

90 db.session.commit() 

91 except Exception: 

92 db.session.rollback() 

93 logger.exception("cant delete records from JSON storage:") 

94 

95 

96class EncryptedJsonStorage(JsonStorage): 

97 def __init__(self, resource_group: str, resource_id: int): 

98 super().__init__(resource_group, resource_id) 

99 self.secret_key = config.get("secret_key", "dummy-key") 

100 

101 def __setitem__(self, key: str, value: dict) -> None: 

102 if isinstance(value, dict) is False: 

103 raise TypeError(f"got {type(value)} instead of dict") 

104 

105 encrypted_value = encrypt_json(value, self.secret_key) 

106 

107 existing_record = self.get_record(key) 

108 if existing_record is None: 

109 record = db.JsonStorage( 

110 name=key, 

111 resource_group=self.resource_group, 

112 resource_id=self.resource_id, 

113 company_id=ctx.company_id, 

114 encrypted_content=encrypted_value, 

115 ) 

116 db.session.add(record) 

117 else: 

118 existing_record.encrypted_content = encrypted_value 

119 db.session.commit() 

120 

121 def set_bytes(self, key: str, encrypted_value: bytes): 

122 existing_record = self.get_record(key) 

123 if existing_record is None: 

124 record = db.JsonStorage( 

125 name=key, 

126 resource_group=self.resource_group, 

127 resource_id=self.resource_id, 

128 company_id=ctx.company_id, 

129 encrypted_content=encrypted_value, 

130 ) 

131 db.session.add(record) 

132 else: 

133 existing_record.encrypted_content = encrypted_value 

134 db.session.commit() 

135 

136 def set_str(self, key: str, encrypted_value: str): 

137 self.set_bytes(key, encrypted_value.encode()) 

138 

139 def __getitem__(self, key: str) -> dict: 

140 record = self.get_record(key) 

141 if record is None: 

142 return None 

143 return decrypt_json(record.encrypted_content, self.secret_key) 

144 

145 

146def get_json_storage(resource_id: int, resource_group: str = RESOURCE_GROUP.PREDICTOR): 

147 return JsonStorage( 

148 resource_group=resource_group, 

149 resource_id=resource_id, 

150 ) 

151 

152 

153def get_encrypted_json_storage(resource_id: int, resource_group: str = RESOURCE_GROUP.PREDICTOR): 

154 return EncryptedJsonStorage( 

155 resource_group=resource_group, 

156 resource_id=resource_id, 

157 )