Coverage for mindsdb / integrations / libs / storage_handler.py: 0%
61 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import dill
3import pickle
4from typing import Dict
5from hashlib import md5
7import redis
8import sqlite3
11class KVStorageHandler:
12 """
13 Simple key-value store. Instances of this handler shall store any information required by other handlers.
14 Context should store anything relevant to the storage handler, e.g. CompanyID, UserID, parent handler name, among others.
15 """ # noqa
16 def __init__(self, context: Dict, config=None):
17 self.config = config if config else os.getenv('MDB_STORAGE_HANDLER_CONFIG')
18 self.serializer = pickle if config.get('serializer', '') == 'pickle' else dill
19 self.context = self.serializer.dumps(context) # store serialized
21 def _get_context_key(self, key: str):
22 serialized_key = self.serializer.dumps(key)
23 return md5(serialized_key).hexdigest() + md5(self.context).hexdigest()
25 def get(self, key, default_value=None):
26 serialized_value = self._get(self._get_context_key(key))
27 if serialized_value:
28 return self.serializer.loads(serialized_value)
29 elif default_value is not None:
30 return default_value
31 else:
32 raise KeyError(f"Key not found: {key}")
34 def set(self, key: str, value: object):
35 serialized_value = self.serializer.dumps(value)
36 self._set(self._get_context_key(key), serialized_value)
38 def _get(self, serialized_key):
39 raise NotImplementedError()
41 def _set(self, serialized_key, serialized_value):
42 raise NotImplementedError()
45class SqliteStorageHandler(KVStorageHandler):
46 """ StorageHandler that uses SQLite as backend. """ # noqa
47 def __init__(self, context: Dict, config=None):
48 super().__init__(context, config)
49 name = self.config["name"] if self.config["name"][-3:] == '.db' else self.config["name"] + '.db'
50 path = os.path.join(self.config.get("path", "./"), name)
51 self.connection = sqlite3.connect(path)
52 self._setup_connection()
54 def _setup_connection(self):
55 """ Checks that a key-value table exists, otherwise creates it. """ # noqa
56 cur = self.connection.cursor()
57 if ('store',) not in list(cur.execute("SELECT name FROM sqlite_master WHERE type='table';")):
58 cur.execute("""create table store (key text PRIMARY KEY, value text)""")
59 self.connection.commit()
61 def _get(self, serialized_key):
62 cur = self.connection.cursor()
63 results = list(cur.execute(f"""select value from store where key='{serialized_key}'"""))
64 if results:
65 return results[0][0] # should always be a single match, hence the [0]s
66 else:
67 return None
69 def _set(self, serialized_key, serialized_value):
70 cur = self.connection.cursor()
71 cur.execute("insert or replace into store values (?, ?)", (serialized_key, serialized_value))
72 self.connection.commit()
75class RedisStorageHandler(KVStorageHandler):
76 """ StorageHandler that uses Redis as backend. """ # noqa
77 def __init__(self, context: Dict, config=None):
78 super().__init__(context, config)
79 assert self.config.get('host', False)
80 assert self.config.get('port', False)
82 self.connection = redis.Redis(host=self.config['host'], port=self.config['port'])
84 def _get(self, serialized_key):
85 return self.connection.get(serialized_key)
87 def _set(self, serialized_key, serialized_value):
88 self.connection.set(serialized_key, serialized_value)