Coverage for mindsdb / integrations / handlers / couchbasevector_handler / couchbasevector_handler.py: 0%
226 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import ast
2from datetime import timedelta
3import uuid
5import pandas as pd
6from couchbase.auth import PasswordAuthenticator
7from couchbase.cluster import Cluster
8from couchbase.exceptions import UnAmbiguousTimeoutException
9from couchbase.options import ClusterOptions
10from couchbase.exceptions import CouchbaseException
11from typing import List, Union
13from mindsdb.utilities import log
14from mindsdb.integrations.libs.response import (
15 HandlerStatusResponse as StatusResponse,
16 HandlerResponse as Response,
17 RESPONSE_TYPE,
18)
19from mindsdb.integrations.libs.vectordatabase_handler import (
20 FilterCondition,
21 TableField,
22 VectorStoreHandler,
23)
26logger = log.getLogger(__name__)
29class CouchbaseVectorHandler(VectorStoreHandler):
30 """
31 This handler handles connection and execution of the Couchbase statements.
32 """
34 name = "couchbasevector"
35 DEFAULT_TIMEOUT_SECONDS = 5
37 def __init__(self, name, **kwargs):
38 super().__init__(name)
39 self.connection_data = kwargs.get("connection_data")
41 self.scope = self.connection_data.get("scope") or "_default"
43 self.bucket_name = self.connection_data.get("bucket")
44 self.cluster = None
46 self.is_connected = False
48 def connect(self):
49 """
50 Set up connections required by the handler.
52 Returns:
53 The connected cluster.
54 """
55 if self.is_connected:
56 return self.cluster
58 auth = PasswordAuthenticator(
59 self.connection_data.get("user"),
60 self.connection_data.get("password"),
61 # NOTE: If using SSL/TLS, add the certificate path.
62 # We strongly reccomend this for production use.
63 # cert_path=cert_path
64 )
66 options = ClusterOptions(auth)
68 conn_str = self.connection_data.get("connection_string")
69 # wan_development is used to avoid latency issues while connecting to Couchbase over the internet
70 options.apply_profile("wan_development")
71 # connect to the cluster
72 cluster = Cluster(
73 conn_str,
74 options,
75 )
77 try:
78 # wait until the cluster is ready for use
79 cluster.wait_until_ready(
80 timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS)
81 )
82 self.is_connected = cluster.connected
83 self.cluster = cluster
84 except UnAmbiguousTimeoutException:
85 self.is_connected = False
86 raise
88 return self.cluster
90 def disconnect(self):
91 """Close any existing connections
92 Should switch self.is_connected.
93 """
94 if self.is_connected is False:
95 return
96 self.is_connected = self.cluster.connected
97 return
99 def check_connection(self) -> StatusResponse:
100 """
101 Check the connection of the Couchbase bucket
102 :return: success status and error message if error occurs
103 """
104 result = StatusResponse(False)
105 need_to_close = self.is_connected is False
107 try:
108 cluster = self.connect()
109 result.success = cluster.connected
110 except UnAmbiguousTimeoutException as e:
111 logger.error(
112 f'Error connecting to Couchbase {self.connection_data["bucket"]}, {e}!'
113 )
114 result.error_message = str(e)
116 if result.success is True and need_to_close:
117 self.disconnect()
118 if result.success is False and self.is_connected is True:
119 self.is_connected = False
120 return result
122 def _translate_conditions(
123 self, conditions: List[FilterCondition]
124 ) -> Union[dict, None]:
125 """
126 Translate filter conditions to a dictionary
127 """
128 if conditions is None:
129 return {}
131 return {
132 condition.column: {
133 "op": condition.op.value,
134 "value": condition.value,
135 }
136 for condition in conditions
137 }
139 def _construct_full_after_from_query(
140 self,
141 where_query: str,
142 limit_query: str,
143 offset_query: str,
144 search_query: str,
145 ) -> str:
147 return f"{where_query} {search_query} {limit_query} {offset_query} "
149 def _construct_where_query(self, filter_conditions=None):
150 """
151 Construct where querys from filter conditions
152 """
153 if filter_conditions is None:
154 return ""
156 where_querys = []
157 metadata_conditions = {
158 key: value
159 for key, value in filter_conditions.items()
160 if not key.startswith(TableField.EMBEDDINGS.value)
161 }
162 for key, value in metadata_conditions.items():
163 if value["op"].lower() == "in":
164 values = list(repr(i) for i in value["value"])
165 value["value"] = "({})".format(", ".join(values))
166 else:
167 value["value"] = repr(value["value"])
168 where_querys.append(f'{key} {value["op"]} {value["value"]}')
170 if len(where_querys) > 1:
171 return f"WHERE {' AND '.join(where_querys)}"
172 elif len(where_querys) == 1:
173 return f"WHERE {where_querys[0]}"
174 else:
175 return ""
177 def _construct_search_query(
178 self, table_name: str, field: str, vector: list, k: int, condition: str
179 ):
180 """
181 Construct a SEARCH query for KNN
182 :param table_name: Name of the table
183 :param field: The field on which to perform the search (e.g., embeddings)
184 :param vector: The vector to search against
185 :param k: The number of nearest neighbors to return, default: 2
186 :return: The SEARCH query as a string
187 """
188 k_value = k if k is not None else 2
189 search_query = f"""
190 {condition} SEARCH({table_name}, {{
191 "fields": ["*"],
192 "query": {{
193 "match_none": ""
194 }},
195 "knn": [
196 {{
197 "k": {k_value},
198 "field": "{field}",
199 "vector": {vector}
200 }}
201 ]
202 }})
203 """
204 return search_query.strip()
206 def select(
207 self,
208 table_name: str,
209 columns: List[str] = None,
210 conditions: List[FilterCondition] = None,
211 offset: int = None,
212 limit: int = None,
213 ) -> pd.DataFrame:
214 filter_conditions = self._translate_conditions(conditions)
215 cluster = self.connect()
216 bucket = cluster.bucket(self.bucket_name)
217 scope = bucket.scope(self.scope)
218 documents, metadatas, embeddings = [], [], []
220 vector_filter = (
221 next(
222 (
223 condition
224 for condition in conditions
225 if condition.column == TableField.EMBEDDINGS.value
226 ),
227 None,
228 )
229 if conditions
230 else None
231 )
232 limit_query = f"LIMIT {limit}" if limit else ""
233 offset_query = f"OFFSET {offset}" if offset else ""
234 if vector_filter:
235 vector = vector_filter.value
236 if not isinstance(vector, list):
237 vector = ast.literal_eval(vector)
239 where_query = self._construct_where_query(filter_conditions)
240 if where_query == "":
241 search_query = self._construct_search_query(
242 table_name,
243 TableField.EMBEDDINGS.value,
244 vector_filter.value,
245 limit,
246 "WHERE",
247 )
248 else:
249 search_query = self._construct_search_query(
250 table_name,
251 TableField.EMBEDDINGS.value,
252 vector_filter.value,
253 limit,
254 "AND",
255 )
256 after_from_query = self._construct_full_after_from_query(
257 where_query, limit_query, offset_query, search_query
258 )
260 if columns is None:
261 targets = "id, content, embeddings, metadata"
262 else:
263 targets = ", ".join(columns)
264 query = f"SELECT SEARCH_SCORE() AS score, {targets} FROM {table_name} {after_from_query}"
265 try:
266 result = scope.query(query)
267 except CouchbaseException as e:
268 raise Exception(f"Error while executing query: '{e}'")
270 # Process results
271 ids, documents, distances = [], [], []
272 for hit in result.rows():
273 ids.append(hit.get("id", ""))
274 documents.append(hit.get("content", ""))
275 embeddings.append(hit.get("embeddings", []))
276 metadatas.append(hit.get("metadata", {}))
277 distances.append(hit.get("score", ""))
278 else:
280 where_query = self._construct_where_query(filter_conditions)
281 after_from_query = self._construct_full_after_from_query(
282 where_query, limit_query, offset_query, ""
283 )
285 if columns is None:
286 targets = "id, content, embeddings, metadata"
287 else:
288 targets = ", ".join(columns)
290 query = f"SELECT {targets} FROM {table_name} {after_from_query}"
291 try:
292 result = scope.query(query)
293 except CouchbaseException as e:
294 raise Exception(f"Error while executing query: '{e}'")
296 ids = []
297 documents = []
298 for hit in result.rows():
299 ids.append(hit.get("id", ""))
300 documents.append(hit.get("content", ""))
301 embeddings.append(hit.get("embeddings", []))
302 metadatas.append(hit.get("metadata", {}))
304 distances = None
306 # Prepare the payload
307 payload = {
308 TableField.ID.value: ids,
309 TableField.CONTENT.value: [doc for doc in documents],
310 TableField.METADATA.value: [doc for doc in metadatas],
311 TableField.EMBEDDINGS.value: [doc for doc in embeddings],
312 }
313 if columns:
314 payload = {
315 column: payload[column]
316 for column in columns
317 if column in payload
318 }
319 if distances is not None:
320 payload[TableField.DISTANCE.value] = distances
321 return pd.DataFrame(payload)
323 def insert(self, table_name: str, data: pd.DataFrame) -> Response:
324 """
325 Insert data into Couchbase.
326 """
328 data.dropna(axis=1, inplace=True)
329 # Convert DataFrame to list of dictionaries
330 records = data.to_dict(orient="records")
331 cluster = self.connect()
332 bucket = cluster.bucket(self.bucket_name)
333 scope = bucket.scope(self.scope)
334 collection = scope.collection(table_name)
336 for record in records:
337 doc_id = record.get(TableField.ID.value, str(uuid.uuid4()))
338 document = {TableField.ID.value: doc_id}
340 if TableField.CONTENT.value in record:
341 document[TableField.CONTENT.value] = record[
342 TableField.CONTENT.value
343 ]
345 if TableField.EMBEDDINGS.value in record:
346 document[TableField.EMBEDDINGS.value] = record[
347 TableField.EMBEDDINGS.value
348 ]
349 if not isinstance(document[TableField.EMBEDDINGS.value], list):
350 document[TableField.EMBEDDINGS.value] = ast.literal_eval(
351 document[TableField.EMBEDDINGS.value]
352 )
354 if TableField.METADATA.value in record:
355 document[TableField.METADATA.value] = record[
356 TableField.METADATA.value
357 ]
358 document_key = f"{table_name}::{doc_id}"
360 collection.upsert(document_key, document)
361 return Response(resp_type=RESPONSE_TYPE.OK)
363 def upsert(self, table_name: str, data: pd.DataFrame):
364 return self.insert(table_name, data)
366 def update(
367 self,
368 table_name: str,
369 data: pd.DataFrame,
370 key_columns: List[str] = None,
371 ):
372 """
373 Update data in Couchbase.
374 """
375 # Convert DataFrame to list of dictionaries
376 records = data.to_dict(orient="records")
377 cluster = self.connect()
378 bucket = cluster.bucket(self.bucket_name)
379 scope = bucket.scope(self.scope)
380 collection = scope.collection(table_name)
381 try:
382 for record in records:
383 doc_id = record.get(TableField.ID.value)
384 if doc_id:
385 existing_doc = self.collection.get(doc_id)
386 if existing_doc:
387 updated_doc = existing_doc.content
388 updated_doc.update(record)
389 collection.replace(doc_id, updated_doc)
390 except CouchbaseException as e:
391 raise Exception(f"Error while updating document: '{e}'")
393 def delete(
394 self, table_name: str, conditions: List[FilterCondition] = None
395 ):
396 """
397 Delete documents in Couchbase based on conditions.
398 """
399 filter_conditions = self._translate_conditions(conditions)
400 cluster = self.connect()
401 bucket = cluster.bucket(self.bucket_name)
402 scope = bucket.scope(self.scope)
403 where_query = self._construct_where_query(filter_conditions)
405 query = f"DELETE FROM {table_name} {where_query}"
406 try:
407 _ = scope.query(query)
408 except CouchbaseException as e:
409 raise Exception(
410 f"Error while performing delete query index: '{e}'"
411 )
413 def create_table(self, table_name: str, if_not_exists=True):
414 """
415 In Couchbase, tables are represented as collections within a bucket.
416 This method creates a new collection if it doesn't exist.
417 """
418 cluster = self.connect()
419 bucket = cluster.bucket(self.bucket_name)
420 try:
421 bucket.collections().create_collection(
422 scope_name=self.scope, collection_name=table_name
423 )
424 except Exception as e:
425 raise Exception(f"Error while creating table: '{e}'")
427 def drop_table(self, table_name: str, if_exists=True) -> Response:
428 """
429 Drop a collection in Couchbase.
430 """
431 cluster = self.connect()
432 bucket = cluster.bucket(self.bucket_name)
433 scope = bucket.scope(self.scope)
434 _ = scope.collection(table_name)
435 try:
436 bucket.collections().drop_collection(table_name)
437 return Response(resp_type=RESPONSE_TYPE.OK)
438 except Exception as e:
439 return Response(resp_type=RESPONSE_TYPE.ERROR, error_message=e)
441 def get_tables(self) -> Response:
442 """
443 Get the list of collections in the Couchbase bucket.
444 """
445 cluster = self.connect()
446 bucket = cluster.bucket(self.bucket_name)
447 collections = bucket.collections().get_all_scopes()
448 collection_names = [
449 coll.name for scope in collections for coll in scope.collections
450 ]
451 collections_df = pd.DataFrame(
452 columns=["table_name"], data=collection_names
453 )
454 return Response(
455 resp_type=RESPONSE_TYPE.TABLE, data_frame=collections_df
456 )
458 def get_columns(self, table_name: str) -> Response:
459 """
460 Get the columns (fields) of a Couchbase collection.
461 """
462 try:
463 cluster = self.connect()
464 bucket = cluster.bucket(self.bucket_name)
465 scope = bucket.scope(self.scope)
466 _ = scope.collection(table_name)
467 except Exception:
468 return Response(
469 resp_type=RESPONSE_TYPE.ERROR,
470 error_message=f"Table {table_name} does not exist!",
471 )
472 return super().get_columns(table_name)