Coverage for mindsdb / integrations / handlers / eventstoredb_handler / eventstoredb_handler.py: 0%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser.ast.base import ASTNode 

2from mindsdb_sql_parser.ast import Select 

3from mindsdb.utilities import log 

4from mindsdb.integrations.libs.base import DatabaseHandler 

5from mindsdb.integrations.libs.response import ( 

6 HandlerStatusResponse as StatusResponse, 

7 HandlerResponse as Response, 

8 RESPONSE_TYPE 

9) 

10 

11from .utils.helpers import get_auth_string, build_basic_url, build_health_url, build_stream_url, build_streams_url, build_next_url, entry_to_df, build_stream_url_last_event 

12import requests 

13import pandas as pd 

14import re 

15 

16logger = log.getLogger(__name__) 

17 

18 

19class EventStoreDB(DatabaseHandler): 

20 """ 

21 Handler for EventStoreDB 

22 The handler uses the Atom Pub Over HTTP of EventStoreDB. 

23 This means that you need to enable AtomPuvOverHTTP on your EventStoreDB instance if you are using v20+ 

24 

25 Why not gRPC? At the moment we cannot use https://pypi.org/project/esdbclient/ which uses the gRPC endpoint 

26 because mysql-connector-python 8.0.32 requires protobuf<=3.20.3,>=3.11.0, but esdbclient 

27 requires protobuf 4.22.1. 

28 

29 Why not TCP? At the moment https://github.com/epoplavskis/photon-pump only works in insecure mode and 

30 configuration is limited. 

31 

32 Third reason, there is no official Python client at the moment of writing of this handler. 

33 But once there is better ESDB Python support for gRPC, we should move this integration from AtomPub to gRPC. 

34 """ 

35 

36 name = 'eventstoredb' 

37 # defaults to an insecure localhost single node 

38 scheme = 'http' 

39 host = 'localhost' 

40 port = '2113' 

41 is_connected = None 

42 basic_url = "" 

43 read_batch_size = 500 # should be adjusted based on use case 

44 headers = { 

45 'Accept': 'application/json', 

46 'ES-ResolveLinkTo': "True" 

47 } 

48 tlsverify = False 

49 

50 def __init__(self, name, **kwargs): 

51 super().__init__(name) 

52 self.parser = parse_sql 

53 connection_data = kwargs['connection_data'] 

54 username = connection_data.get('username') 

55 password = connection_data.get('password') 

56 self.host = connection_data.get('host') 

57 if connection_data.get('tls') is not None and isinstance(connection_data.get('tls'), bool) \ 

58 and connection_data.get('tls'): 

59 self.scheme = 'https' 

60 if connection_data.get('port') is not None and isinstance(connection_data.get('port'), int): 

61 self.port = connection_data.get('port') 

62 if connection_data.get('page_size') is not None: 

63 if isinstance(connection_data.get('page_size'), int) and connection_data.get('page_size') > 0: 

64 self.read_batch_size = connection_data.get('page_size') 

65 if username is not None and password is not None: 

66 if isinstance(username, str) and isinstance(password, str): 

67 self.headers['authorization'] = get_auth_string(username, password) 

68 if connection_data.get('tlsverify') is not None and isinstance(connection_data.get('tlsverify'), bool): 

69 self.tlsverify = connection_data.get('tlsverify') 

70 

71 self.basic_url = build_basic_url(self.scheme, self.host, self.port) 

72 

73 def __del__(self): 

74 if self.is_connected is True: 

75 self.disconnect() 

76 

77 def connect(self, **kwargs): 

78 if self.check_connection() == StatusResponse(True): 

79 self.is_connected = True 

80 logger.info(f'{self.name} connection successful!') 

81 return StatusResponse(True) 

82 logger.info(f'{self.name} connection could not be made.') 

83 return StatusResponse(False) 

84 

85 def disconnect(self, **kwargs): 

86 return 

87 

88 def check_connection(self) -> StatusResponse: 

89 try: 

90 response = requests.get(build_health_url(self.basic_url), verify=self.tlsverify) 

91 if response.status_code == 204: 

92 return StatusResponse(True) 

93 except Exception as e: 

94 logger.error(f'{self.name} check connection failed with: {e}!') 

95 return StatusResponse(False) 

96 

97 def query(self, query: ASTNode) -> Response: 

98 if type(query) == Select: 

99 stream_name = query.from_table.parts[-1] # i.e. table name 

100 params = { 

101 'embed': 'tryharder' 

102 } 

103 stream_endpoint = build_stream_url(self.basic_url, stream_name) 

104 response = requests.get(stream_endpoint, params=params, headers=self.headers, verify=self.tlsverify) 

105 entries = [] 

106 if response is not None and response.status_code == 200: 

107 json_response = response.json() 

108 for entry in json_response["entries"]: 

109 entry = entry_to_df(entry) 

110 entries.append(entry) 

111 while True: 

112 end_of_stream = True 

113 if 'links' in json_response: 

114 for link in json_response['links']: 

115 if 'relation' in link: 

116 if link['relation'] == 'next': 

117 end_of_stream = False 

118 response = requests.get(build_next_url(link['uri'], self.read_batch_size), 

119 params=params, headers=self.headers, verify=self.tlsverify) 

120 json_response = response.json() 

121 for entry in json_response["entries"]: 

122 entry = entry_to_df(entry) 

123 entries.append(entry) 

124 if end_of_stream: 

125 break 

126 

127 df = pd.concat(entries) 

128 

129 return Response( 

130 RESPONSE_TYPE.TABLE, 

131 df 

132 ) 

133 else: 

134 return Response( 

135 RESPONSE_TYPE.ERROR, 

136 error_message="Only 'select' queries are supported for EventStoreDB" 

137 ) 

138 

139 def native_query(self, query: str) -> Response: 

140 ast = self.parser(query) 

141 return self.query(ast) 

142 

143 def get_tables(self) -> Response: 

144 """ 

145 List all streams i.e tables 

146 """ 

147 params = { 

148 'embed': 'tryharder' 

149 } 

150 stream_endpoint = build_streams_url(self.basic_url) 

151 response = requests.get(stream_endpoint, params=params, headers=self.headers, verify=self.tlsverify) 

152 streams = [] 

153 if response is not None and response.status_code == 200: 

154 json_response = response.json() 

155 for entry in json_response["entries"]: 

156 if "title" in entry: 

157 streams.append(entry["title"].split('@')[1]) 

158 while True: 

159 end_of_stream = True 

160 if 'links' in json_response: 

161 for link in json_response['links']: 

162 if 'relation' in link: 

163 if link['relation'] == 'next': 

164 end_of_stream = False 

165 response = requests.get(build_next_url(link['uri'], self.read_batch_size), 

166 params=params, headers=self.headers, verify=self.tlsverify) 

167 json_response = response.json() 

168 for entry in json_response["entries"]: 

169 if "title" in entry: 

170 streams.append(entry["title"].split('@')[1]) 

171 if end_of_stream: 

172 break 

173 

174 df = pd.DataFrame(streams, 

175 columns=['table_name']) 

176 return Response( 

177 RESPONSE_TYPE.TABLE, 

178 df 

179 ) 

180 

181 def get_columns(self, table_name) -> Response: 

182 params = { 

183 'embed': 'tryharder' 

184 } 

185 stream_endpoint = build_stream_url_last_event(self.basic_url, table_name) 

186 response = requests.get(stream_endpoint, params=params, headers=self.headers, verify=self.tlsverify) 

187 entry = None 

188 if response is not None and response.status_code == 200: 

189 json_response = response.json() 

190 if json_response is not None and len(json_response) > 0: 

191 entry = entry_to_df(json_response["entries"][0]) 

192 if entry is None: 

193 return Response( 

194 RESPONSE_TYPE.ERROR, 

195 "Could not retrieve JSON event data to infer column types." 

196 ) 

197 data = [] 

198 for k, v in entry.items(): 

199 data.append([k, v.dtypes.name]) 

200 df = pd.DataFrame(data, columns=['Field', 'Type']) 

201 return Response( 

202 RESPONSE_TYPE.TABLE, 

203 df 

204 ) 

205 

206 

207def parse_sql(sql, dialect='sqlite'): 

208 # remove ending semicolon and spaces 

209 sql = re.sub(r'[\s;]+$', '', sql) 

210 

211 from mindsdb_sql_parser.lexer import MindsDBLexer 

212 from mindsdb_sql_parser.parser import MindsDBParser 

213 lexer, parser = MindsDBLexer(), MindsDBParser() 

214 

215 tokens = lexer.tokenize(sql) 

216 ast = parser.parse(tokens) 

217 return ast