Coverage for mindsdb / integrations / handlers / eventstoredb_handler / eventstoredb_handler.py: 0%
142 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb_sql_parser.ast.base import ASTNode
2from mindsdb_sql_parser.ast import Select
3from mindsdb.utilities import log
4from mindsdb.integrations.libs.base import DatabaseHandler
5from mindsdb.integrations.libs.response import (
6 HandlerStatusResponse as StatusResponse,
7 HandlerResponse as Response,
8 RESPONSE_TYPE
9)
11from .utils.helpers import get_auth_string, build_basic_url, build_health_url, build_stream_url, build_streams_url, build_next_url, entry_to_df, build_stream_url_last_event
12import requests
13import pandas as pd
14import re
16logger = log.getLogger(__name__)
19class EventStoreDB(DatabaseHandler):
20 """
21 Handler for EventStoreDB
22 The handler uses the Atom Pub Over HTTP of EventStoreDB.
23 This means that you need to enable AtomPuvOverHTTP on your EventStoreDB instance if you are using v20+
25 Why not gRPC? At the moment we cannot use https://pypi.org/project/esdbclient/ which uses the gRPC endpoint
26 because mysql-connector-python 8.0.32 requires protobuf<=3.20.3,>=3.11.0, but esdbclient
27 requires protobuf 4.22.1.
29 Why not TCP? At the moment https://github.com/epoplavskis/photon-pump only works in insecure mode and
30 configuration is limited.
32 Third reason, there is no official Python client at the moment of writing of this handler.
33 But once there is better ESDB Python support for gRPC, we should move this integration from AtomPub to gRPC.
34 """
36 name = 'eventstoredb'
37 # defaults to an insecure localhost single node
38 scheme = 'http'
39 host = 'localhost'
40 port = '2113'
41 is_connected = None
42 basic_url = ""
43 read_batch_size = 500 # should be adjusted based on use case
44 headers = {
45 'Accept': 'application/json',
46 'ES-ResolveLinkTo': "True"
47 }
48 tlsverify = False
50 def __init__(self, name, **kwargs):
51 super().__init__(name)
52 self.parser = parse_sql
53 connection_data = kwargs['connection_data']
54 username = connection_data.get('username')
55 password = connection_data.get('password')
56 self.host = connection_data.get('host')
57 if connection_data.get('tls') is not None and isinstance(connection_data.get('tls'), bool) \
58 and connection_data.get('tls'):
59 self.scheme = 'https'
60 if connection_data.get('port') is not None and isinstance(connection_data.get('port'), int):
61 self.port = connection_data.get('port')
62 if connection_data.get('page_size') is not None:
63 if isinstance(connection_data.get('page_size'), int) and connection_data.get('page_size') > 0:
64 self.read_batch_size = connection_data.get('page_size')
65 if username is not None and password is not None:
66 if isinstance(username, str) and isinstance(password, str):
67 self.headers['authorization'] = get_auth_string(username, password)
68 if connection_data.get('tlsverify') is not None and isinstance(connection_data.get('tlsverify'), bool):
69 self.tlsverify = connection_data.get('tlsverify')
71 self.basic_url = build_basic_url(self.scheme, self.host, self.port)
73 def __del__(self):
74 if self.is_connected is True:
75 self.disconnect()
77 def connect(self, **kwargs):
78 if self.check_connection() == StatusResponse(True):
79 self.is_connected = True
80 logger.info(f'{self.name} connection successful!')
81 return StatusResponse(True)
82 logger.info(f'{self.name} connection could not be made.')
83 return StatusResponse(False)
85 def disconnect(self, **kwargs):
86 return
88 def check_connection(self) -> StatusResponse:
89 try:
90 response = requests.get(build_health_url(self.basic_url), verify=self.tlsverify)
91 if response.status_code == 204:
92 return StatusResponse(True)
93 except Exception as e:
94 logger.error(f'{self.name} check connection failed with: {e}!')
95 return StatusResponse(False)
97 def query(self, query: ASTNode) -> Response:
98 if type(query) == Select:
99 stream_name = query.from_table.parts[-1] # i.e. table name
100 params = {
101 'embed': 'tryharder'
102 }
103 stream_endpoint = build_stream_url(self.basic_url, stream_name)
104 response = requests.get(stream_endpoint, params=params, headers=self.headers, verify=self.tlsverify)
105 entries = []
106 if response is not None and response.status_code == 200:
107 json_response = response.json()
108 for entry in json_response["entries"]:
109 entry = entry_to_df(entry)
110 entries.append(entry)
111 while True:
112 end_of_stream = True
113 if 'links' in json_response:
114 for link in json_response['links']:
115 if 'relation' in link:
116 if link['relation'] == 'next':
117 end_of_stream = False
118 response = requests.get(build_next_url(link['uri'], self.read_batch_size),
119 params=params, headers=self.headers, verify=self.tlsverify)
120 json_response = response.json()
121 for entry in json_response["entries"]:
122 entry = entry_to_df(entry)
123 entries.append(entry)
124 if end_of_stream:
125 break
127 df = pd.concat(entries)
129 return Response(
130 RESPONSE_TYPE.TABLE,
131 df
132 )
133 else:
134 return Response(
135 RESPONSE_TYPE.ERROR,
136 error_message="Only 'select' queries are supported for EventStoreDB"
137 )
139 def native_query(self, query: str) -> Response:
140 ast = self.parser(query)
141 return self.query(ast)
143 def get_tables(self) -> Response:
144 """
145 List all streams i.e tables
146 """
147 params = {
148 'embed': 'tryharder'
149 }
150 stream_endpoint = build_streams_url(self.basic_url)
151 response = requests.get(stream_endpoint, params=params, headers=self.headers, verify=self.tlsverify)
152 streams = []
153 if response is not None and response.status_code == 200:
154 json_response = response.json()
155 for entry in json_response["entries"]:
156 if "title" in entry:
157 streams.append(entry["title"].split('@')[1])
158 while True:
159 end_of_stream = True
160 if 'links' in json_response:
161 for link in json_response['links']:
162 if 'relation' in link:
163 if link['relation'] == 'next':
164 end_of_stream = False
165 response = requests.get(build_next_url(link['uri'], self.read_batch_size),
166 params=params, headers=self.headers, verify=self.tlsverify)
167 json_response = response.json()
168 for entry in json_response["entries"]:
169 if "title" in entry:
170 streams.append(entry["title"].split('@')[1])
171 if end_of_stream:
172 break
174 df = pd.DataFrame(streams,
175 columns=['table_name'])
176 return Response(
177 RESPONSE_TYPE.TABLE,
178 df
179 )
181 def get_columns(self, table_name) -> Response:
182 params = {
183 'embed': 'tryharder'
184 }
185 stream_endpoint = build_stream_url_last_event(self.basic_url, table_name)
186 response = requests.get(stream_endpoint, params=params, headers=self.headers, verify=self.tlsverify)
187 entry = None
188 if response is not None and response.status_code == 200:
189 json_response = response.json()
190 if json_response is not None and len(json_response) > 0:
191 entry = entry_to_df(json_response["entries"][0])
192 if entry is None:
193 return Response(
194 RESPONSE_TYPE.ERROR,
195 "Could not retrieve JSON event data to infer column types."
196 )
197 data = []
198 for k, v in entry.items():
199 data.append([k, v.dtypes.name])
200 df = pd.DataFrame(data, columns=['Field', 'Type'])
201 return Response(
202 RESPONSE_TYPE.TABLE,
203 df
204 )
207def parse_sql(sql, dialect='sqlite'):
208 # remove ending semicolon and spaces
209 sql = re.sub(r'[\s;]+$', '', sql)
211 from mindsdb_sql_parser.lexer import MindsDBLexer
212 from mindsdb_sql_parser.parser import MindsDBParser
213 lexer, parser = MindsDBLexer(), MindsDBParser()
215 tokens = lexer.tokenize(sql)
216 ast = parser.parse(tokens)
217 return ast