Coverage for mindsdb / integrations / libs / storage_handler.py: 0%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import dill 

3import pickle 

4from typing import Dict 

5from hashlib import md5 

6 

7import redis 

8import sqlite3 

9 

10 

11class KVStorageHandler: 

12 """  

13 Simple key-value store. Instances of this handler shall store any information required by other handlers. 

14 Context should store anything relevant to the storage handler, e.g. CompanyID, UserID, parent handler name, among others.  

15 """ # noqa 

16 def __init__(self, context: Dict, config=None): 

17 self.config = config if config else os.getenv('MDB_STORAGE_HANDLER_CONFIG') 

18 self.serializer = pickle if config.get('serializer', '') == 'pickle' else dill 

19 self.context = self.serializer.dumps(context) # store serialized 

20 

21 def _get_context_key(self, key: str): 

22 serialized_key = self.serializer.dumps(key) 

23 return md5(serialized_key).hexdigest() + md5(self.context).hexdigest() 

24 

25 def get(self, key, default_value=None): 

26 serialized_value = self._get(self._get_context_key(key)) 

27 if serialized_value: 

28 return self.serializer.loads(serialized_value) 

29 elif default_value is not None: 

30 return default_value 

31 else: 

32 raise KeyError(f"Key not found: {key}") 

33 

34 def set(self, key: str, value: object): 

35 serialized_value = self.serializer.dumps(value) 

36 self._set(self._get_context_key(key), serialized_value) 

37 

38 def _get(self, serialized_key): 

39 raise NotImplementedError() 

40 

41 def _set(self, serialized_key, serialized_value): 

42 raise NotImplementedError() 

43 

44 

45class SqliteStorageHandler(KVStorageHandler): 

46 """ StorageHandler that uses SQLite as backend. """ # noqa 

47 def __init__(self, context: Dict, config=None): 

48 super().__init__(context, config) 

49 name = self.config["name"] if self.config["name"][-3:] == '.db' else self.config["name"] + '.db' 

50 path = os.path.join(self.config.get("path", "./"), name) 

51 self.connection = sqlite3.connect(path) 

52 self._setup_connection() 

53 

54 def _setup_connection(self): 

55 """ Checks that a key-value table exists, otherwise creates it. """ # noqa 

56 cur = self.connection.cursor() 

57 if ('store',) not in list(cur.execute("SELECT name FROM sqlite_master WHERE type='table';")): 

58 cur.execute("""create table store (key text PRIMARY KEY, value text)""") 

59 self.connection.commit() 

60 

61 def _get(self, serialized_key): 

62 cur = self.connection.cursor() 

63 results = list(cur.execute(f"""select value from store where key='{serialized_key}'""")) 

64 if results: 

65 return results[0][0] # should always be a single match, hence the [0]s 

66 else: 

67 return None 

68 

69 def _set(self, serialized_key, serialized_value): 

70 cur = self.connection.cursor() 

71 cur.execute("insert or replace into store values (?, ?)", (serialized_key, serialized_value)) 

72 self.connection.commit() 

73 

74 

75class RedisStorageHandler(KVStorageHandler): 

76 """ StorageHandler that uses Redis as backend. """ # noqa 

77 def __init__(self, context: Dict, config=None): 

78 super().__init__(context, config) 

79 assert self.config.get('host', False) 

80 assert self.config.get('port', False) 

81 

82 self.connection = redis.Redis(host=self.config['host'], port=self.config['port']) 

83 

84 def _get(self, serialized_key): 

85 return self.connection.get(serialized_key) 

86 

87 def _set(self, serialized_key, serialized_value): 

88 self.connection.set(serialized_key, serialized_value)