Coverage for mindsdb / interfaces / storage / json.py: 52%
94 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb.utilities.functions import decrypt_json, encrypt_json
2from mindsdb.utilities.config import config
3from mindsdb.interfaces.storage import db
4from mindsdb.interfaces.storage.fs import RESOURCE_GROUP
5from mindsdb.utilities.context import context as ctx
6from mindsdb.utilities import log
8logger = log.getLogger(__name__)
11class JsonStorage:
12 def __init__(self, resource_group: str, resource_id: int):
13 self.resource_group = resource_group
14 self.resource_id = resource_id
16 def __setitem__(self, key, value):
17 if isinstance(value, dict) is False: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true
18 raise TypeError(f"got {type(value)} instead of dict")
19 existing_record = self.get_record(key)
20 if existing_record is None: 20 ↛ 30line 20 didn't jump to line 30 because the condition on line 20 was always true
21 record = db.JsonStorage(
22 name=key,
23 resource_group=self.resource_group,
24 resource_id=self.resource_id,
25 company_id=ctx.company_id,
26 content=value,
27 )
28 db.session.add(record)
29 else:
30 existing_record.content = value
31 db.session.commit()
33 def set(self, key, value):
34 self[key] = value
36 def __getitem__(self, key):
37 record = self.get_record(key)
38 if record is None:
39 return None
40 return record.content
42 def get(self, key):
43 return self[key]
45 def get_record(self, key):
46 record = (
47 db.session.query(db.JsonStorage)
48 .filter_by(
49 name=key, resource_group=self.resource_group, resource_id=self.resource_id, company_id=ctx.company_id
50 )
51 .first()
52 )
53 return record
55 def get_all_records(self):
56 records = (
57 db.session.query(db.JsonStorage)
58 .filter_by(resource_group=self.resource_group, resource_id=self.resource_id, company_id=ctx.company_id)
59 .all()
60 )
61 return records
63 def __repr__(self):
64 records = self.get_all_records()
65 names = [x.name for x in records]
66 return f"json_storage({names})"
68 def __len__(self):
69 records = self.get_all_records()
70 return len(records)
72 def __delitem__(self, key):
73 record = self.get_record(key)
74 if record is not None:
75 try:
76 db.session.delete(record)
77 db.session.commit()
78 except Exception:
79 db.session.rollback()
80 logger.exception("cant delete record from JSON storage:")
82 def delete(self, key):
83 del self[key]
85 def clean(self):
86 json_records = self.get_all_records()
87 for record in json_records:
88 db.session.delete(record)
89 try:
90 db.session.commit()
91 except Exception:
92 db.session.rollback()
93 logger.exception("cant delete records from JSON storage:")
96class EncryptedJsonStorage(JsonStorage):
97 def __init__(self, resource_group: str, resource_id: int):
98 super().__init__(resource_group, resource_id)
99 self.secret_key = config.get("secret_key", "dummy-key")
101 def __setitem__(self, key: str, value: dict) -> None:
102 if isinstance(value, dict) is False:
103 raise TypeError(f"got {type(value)} instead of dict")
105 encrypted_value = encrypt_json(value, self.secret_key)
107 existing_record = self.get_record(key)
108 if existing_record is None:
109 record = db.JsonStorage(
110 name=key,
111 resource_group=self.resource_group,
112 resource_id=self.resource_id,
113 company_id=ctx.company_id,
114 encrypted_content=encrypted_value,
115 )
116 db.session.add(record)
117 else:
118 existing_record.encrypted_content = encrypted_value
119 db.session.commit()
121 def set_bytes(self, key: str, encrypted_value: bytes):
122 existing_record = self.get_record(key)
123 if existing_record is None:
124 record = db.JsonStorage(
125 name=key,
126 resource_group=self.resource_group,
127 resource_id=self.resource_id,
128 company_id=ctx.company_id,
129 encrypted_content=encrypted_value,
130 )
131 db.session.add(record)
132 else:
133 existing_record.encrypted_content = encrypted_value
134 db.session.commit()
136 def set_str(self, key: str, encrypted_value: str):
137 self.set_bytes(key, encrypted_value.encode())
139 def __getitem__(self, key: str) -> dict:
140 record = self.get_record(key)
141 if record is None:
142 return None
143 return decrypt_json(record.encrypted_content, self.secret_key)
146def get_json_storage(resource_id: int, resource_group: str = RESOURCE_GROUP.PREDICTOR):
147 return JsonStorage(
148 resource_group=resource_group,
149 resource_id=resource_id,
150 )
153def get_encrypted_json_storage(resource_id: int, resource_group: str = RESOURCE_GROUP.PREDICTOR):
154 return EncryptedJsonStorage(
155 resource_group=resource_group,
156 resource_id=resource_id,
157 )