Coverage for mindsdb / integrations / handlers / couchbasevector_handler / couchbasevector_handler.py: 0%

226 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import ast 

2from datetime import timedelta 

3import uuid 

4 

5import pandas as pd 

6from couchbase.auth import PasswordAuthenticator 

7from couchbase.cluster import Cluster 

8from couchbase.exceptions import UnAmbiguousTimeoutException 

9from couchbase.options import ClusterOptions 

10from couchbase.exceptions import CouchbaseException 

11from typing import List, Union 

12 

13from mindsdb.utilities import log 

14from mindsdb.integrations.libs.response import ( 

15 HandlerStatusResponse as StatusResponse, 

16 HandlerResponse as Response, 

17 RESPONSE_TYPE, 

18) 

19from mindsdb.integrations.libs.vectordatabase_handler import ( 

20 FilterCondition, 

21 TableField, 

22 VectorStoreHandler, 

23) 

24 

25 

26logger = log.getLogger(__name__) 

27 

28 

29class CouchbaseVectorHandler(VectorStoreHandler): 

30 """ 

31 This handler handles connection and execution of the Couchbase statements. 

32 """ 

33 

34 name = "couchbasevector" 

35 DEFAULT_TIMEOUT_SECONDS = 5 

36 

37 def __init__(self, name, **kwargs): 

38 super().__init__(name) 

39 self.connection_data = kwargs.get("connection_data") 

40 

41 self.scope = self.connection_data.get("scope") or "_default" 

42 

43 self.bucket_name = self.connection_data.get("bucket") 

44 self.cluster = None 

45 

46 self.is_connected = False 

47 

48 def connect(self): 

49 """ 

50 Set up connections required by the handler. 

51 

52 Returns: 

53 The connected cluster. 

54 """ 

55 if self.is_connected: 

56 return self.cluster 

57 

58 auth = PasswordAuthenticator( 

59 self.connection_data.get("user"), 

60 self.connection_data.get("password"), 

61 # NOTE: If using SSL/TLS, add the certificate path. 

62 # We strongly reccomend this for production use. 

63 # cert_path=cert_path 

64 ) 

65 

66 options = ClusterOptions(auth) 

67 

68 conn_str = self.connection_data.get("connection_string") 

69 # wan_development is used to avoid latency issues while connecting to Couchbase over the internet 

70 options.apply_profile("wan_development") 

71 # connect to the cluster 

72 cluster = Cluster( 

73 conn_str, 

74 options, 

75 ) 

76 

77 try: 

78 # wait until the cluster is ready for use 

79 cluster.wait_until_ready( 

80 timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS) 

81 ) 

82 self.is_connected = cluster.connected 

83 self.cluster = cluster 

84 except UnAmbiguousTimeoutException: 

85 self.is_connected = False 

86 raise 

87 

88 return self.cluster 

89 

90 def disconnect(self): 

91 """Close any existing connections 

92 Should switch self.is_connected. 

93 """ 

94 if self.is_connected is False: 

95 return 

96 self.is_connected = self.cluster.connected 

97 return 

98 

99 def check_connection(self) -> StatusResponse: 

100 """ 

101 Check the connection of the Couchbase bucket 

102 :return: success status and error message if error occurs 

103 """ 

104 result = StatusResponse(False) 

105 need_to_close = self.is_connected is False 

106 

107 try: 

108 cluster = self.connect() 

109 result.success = cluster.connected 

110 except UnAmbiguousTimeoutException as e: 

111 logger.error( 

112 f'Error connecting to Couchbase {self.connection_data["bucket"]}, {e}!' 

113 ) 

114 result.error_message = str(e) 

115 

116 if result.success is True and need_to_close: 

117 self.disconnect() 

118 if result.success is False and self.is_connected is True: 

119 self.is_connected = False 

120 return result 

121 

122 def _translate_conditions( 

123 self, conditions: List[FilterCondition] 

124 ) -> Union[dict, None]: 

125 """ 

126 Translate filter conditions to a dictionary 

127 """ 

128 if conditions is None: 

129 return {} 

130 

131 return { 

132 condition.column: { 

133 "op": condition.op.value, 

134 "value": condition.value, 

135 } 

136 for condition in conditions 

137 } 

138 

139 def _construct_full_after_from_query( 

140 self, 

141 where_query: str, 

142 limit_query: str, 

143 offset_query: str, 

144 search_query: str, 

145 ) -> str: 

146 

147 return f"{where_query} {search_query} {limit_query} {offset_query} " 

148 

149 def _construct_where_query(self, filter_conditions=None): 

150 """ 

151 Construct where querys from filter conditions 

152 """ 

153 if filter_conditions is None: 

154 return "" 

155 

156 where_querys = [] 

157 metadata_conditions = { 

158 key: value 

159 for key, value in filter_conditions.items() 

160 if not key.startswith(TableField.EMBEDDINGS.value) 

161 } 

162 for key, value in metadata_conditions.items(): 

163 if value["op"].lower() == "in": 

164 values = list(repr(i) for i in value["value"]) 

165 value["value"] = "({})".format(", ".join(values)) 

166 else: 

167 value["value"] = repr(value["value"]) 

168 where_querys.append(f'{key} {value["op"]} {value["value"]}') 

169 

170 if len(where_querys) > 1: 

171 return f"WHERE {' AND '.join(where_querys)}" 

172 elif len(where_querys) == 1: 

173 return f"WHERE {where_querys[0]}" 

174 else: 

175 return "" 

176 

177 def _construct_search_query( 

178 self, table_name: str, field: str, vector: list, k: int, condition: str 

179 ): 

180 """ 

181 Construct a SEARCH query for KNN 

182 :param table_name: Name of the table 

183 :param field: The field on which to perform the search (e.g., embeddings) 

184 :param vector: The vector to search against 

185 :param k: The number of nearest neighbors to return, default: 2 

186 :return: The SEARCH query as a string 

187 """ 

188 k_value = k if k is not None else 2 

189 search_query = f""" 

190 {condition} SEARCH({table_name}, {{ 

191 "fields": ["*"], 

192 "query": {{ 

193 "match_none": "" 

194 }}, 

195 "knn": [ 

196 {{ 

197 "k": {k_value}, 

198 "field": "{field}", 

199 "vector": {vector} 

200 }} 

201 ] 

202 }}) 

203 """ 

204 return search_query.strip() 

205 

206 def select( 

207 self, 

208 table_name: str, 

209 columns: List[str] = None, 

210 conditions: List[FilterCondition] = None, 

211 offset: int = None, 

212 limit: int = None, 

213 ) -> pd.DataFrame: 

214 filter_conditions = self._translate_conditions(conditions) 

215 cluster = self.connect() 

216 bucket = cluster.bucket(self.bucket_name) 

217 scope = bucket.scope(self.scope) 

218 documents, metadatas, embeddings = [], [], [] 

219 

220 vector_filter = ( 

221 next( 

222 ( 

223 condition 

224 for condition in conditions 

225 if condition.column == TableField.EMBEDDINGS.value 

226 ), 

227 None, 

228 ) 

229 if conditions 

230 else None 

231 ) 

232 limit_query = f"LIMIT {limit}" if limit else "" 

233 offset_query = f"OFFSET {offset}" if offset else "" 

234 if vector_filter: 

235 vector = vector_filter.value 

236 if not isinstance(vector, list): 

237 vector = ast.literal_eval(vector) 

238 

239 where_query = self._construct_where_query(filter_conditions) 

240 if where_query == "": 

241 search_query = self._construct_search_query( 

242 table_name, 

243 TableField.EMBEDDINGS.value, 

244 vector_filter.value, 

245 limit, 

246 "WHERE", 

247 ) 

248 else: 

249 search_query = self._construct_search_query( 

250 table_name, 

251 TableField.EMBEDDINGS.value, 

252 vector_filter.value, 

253 limit, 

254 "AND", 

255 ) 

256 after_from_query = self._construct_full_after_from_query( 

257 where_query, limit_query, offset_query, search_query 

258 ) 

259 

260 if columns is None: 

261 targets = "id, content, embeddings, metadata" 

262 else: 

263 targets = ", ".join(columns) 

264 query = f"SELECT SEARCH_SCORE() AS score, {targets} FROM {table_name} {after_from_query}" 

265 try: 

266 result = scope.query(query) 

267 except CouchbaseException as e: 

268 raise Exception(f"Error while executing query: '{e}'") 

269 

270 # Process results 

271 ids, documents, distances = [], [], [] 

272 for hit in result.rows(): 

273 ids.append(hit.get("id", "")) 

274 documents.append(hit.get("content", "")) 

275 embeddings.append(hit.get("embeddings", [])) 

276 metadatas.append(hit.get("metadata", {})) 

277 distances.append(hit.get("score", "")) 

278 else: 

279 

280 where_query = self._construct_where_query(filter_conditions) 

281 after_from_query = self._construct_full_after_from_query( 

282 where_query, limit_query, offset_query, "" 

283 ) 

284 

285 if columns is None: 

286 targets = "id, content, embeddings, metadata" 

287 else: 

288 targets = ", ".join(columns) 

289 

290 query = f"SELECT {targets} FROM {table_name} {after_from_query}" 

291 try: 

292 result = scope.query(query) 

293 except CouchbaseException as e: 

294 raise Exception(f"Error while executing query: '{e}'") 

295 

296 ids = [] 

297 documents = [] 

298 for hit in result.rows(): 

299 ids.append(hit.get("id", "")) 

300 documents.append(hit.get("content", "")) 

301 embeddings.append(hit.get("embeddings", [])) 

302 metadatas.append(hit.get("metadata", {})) 

303 

304 distances = None 

305 

306 # Prepare the payload 

307 payload = { 

308 TableField.ID.value: ids, 

309 TableField.CONTENT.value: [doc for doc in documents], 

310 TableField.METADATA.value: [doc for doc in metadatas], 

311 TableField.EMBEDDINGS.value: [doc for doc in embeddings], 

312 } 

313 if columns: 

314 payload = { 

315 column: payload[column] 

316 for column in columns 

317 if column in payload 

318 } 

319 if distances is not None: 

320 payload[TableField.DISTANCE.value] = distances 

321 return pd.DataFrame(payload) 

322 

323 def insert(self, table_name: str, data: pd.DataFrame) -> Response: 

324 """ 

325 Insert data into Couchbase. 

326 """ 

327 

328 data.dropna(axis=1, inplace=True) 

329 # Convert DataFrame to list of dictionaries 

330 records = data.to_dict(orient="records") 

331 cluster = self.connect() 

332 bucket = cluster.bucket(self.bucket_name) 

333 scope = bucket.scope(self.scope) 

334 collection = scope.collection(table_name) 

335 

336 for record in records: 

337 doc_id = record.get(TableField.ID.value, str(uuid.uuid4())) 

338 document = {TableField.ID.value: doc_id} 

339 

340 if TableField.CONTENT.value in record: 

341 document[TableField.CONTENT.value] = record[ 

342 TableField.CONTENT.value 

343 ] 

344 

345 if TableField.EMBEDDINGS.value in record: 

346 document[TableField.EMBEDDINGS.value] = record[ 

347 TableField.EMBEDDINGS.value 

348 ] 

349 if not isinstance(document[TableField.EMBEDDINGS.value], list): 

350 document[TableField.EMBEDDINGS.value] = ast.literal_eval( 

351 document[TableField.EMBEDDINGS.value] 

352 ) 

353 

354 if TableField.METADATA.value in record: 

355 document[TableField.METADATA.value] = record[ 

356 TableField.METADATA.value 

357 ] 

358 document_key = f"{table_name}::{doc_id}" 

359 

360 collection.upsert(document_key, document) 

361 return Response(resp_type=RESPONSE_TYPE.OK) 

362 

363 def upsert(self, table_name: str, data: pd.DataFrame): 

364 return self.insert(table_name, data) 

365 

366 def update( 

367 self, 

368 table_name: str, 

369 data: pd.DataFrame, 

370 key_columns: List[str] = None, 

371 ): 

372 """ 

373 Update data in Couchbase. 

374 """ 

375 # Convert DataFrame to list of dictionaries 

376 records = data.to_dict(orient="records") 

377 cluster = self.connect() 

378 bucket = cluster.bucket(self.bucket_name) 

379 scope = bucket.scope(self.scope) 

380 collection = scope.collection(table_name) 

381 try: 

382 for record in records: 

383 doc_id = record.get(TableField.ID.value) 

384 if doc_id: 

385 existing_doc = self.collection.get(doc_id) 

386 if existing_doc: 

387 updated_doc = existing_doc.content 

388 updated_doc.update(record) 

389 collection.replace(doc_id, updated_doc) 

390 except CouchbaseException as e: 

391 raise Exception(f"Error while updating document: '{e}'") 

392 

393 def delete( 

394 self, table_name: str, conditions: List[FilterCondition] = None 

395 ): 

396 """ 

397 Delete documents in Couchbase based on conditions. 

398 """ 

399 filter_conditions = self._translate_conditions(conditions) 

400 cluster = self.connect() 

401 bucket = cluster.bucket(self.bucket_name) 

402 scope = bucket.scope(self.scope) 

403 where_query = self._construct_where_query(filter_conditions) 

404 

405 query = f"DELETE FROM {table_name} {where_query}" 

406 try: 

407 _ = scope.query(query) 

408 except CouchbaseException as e: 

409 raise Exception( 

410 f"Error while performing delete query index: '{e}'" 

411 ) 

412 

413 def create_table(self, table_name: str, if_not_exists=True): 

414 """ 

415 In Couchbase, tables are represented as collections within a bucket. 

416 This method creates a new collection if it doesn't exist. 

417 """ 

418 cluster = self.connect() 

419 bucket = cluster.bucket(self.bucket_name) 

420 try: 

421 bucket.collections().create_collection( 

422 scope_name=self.scope, collection_name=table_name 

423 ) 

424 except Exception as e: 

425 raise Exception(f"Error while creating table: '{e}'") 

426 

427 def drop_table(self, table_name: str, if_exists=True) -> Response: 

428 """ 

429 Drop a collection in Couchbase. 

430 """ 

431 cluster = self.connect() 

432 bucket = cluster.bucket(self.bucket_name) 

433 scope = bucket.scope(self.scope) 

434 _ = scope.collection(table_name) 

435 try: 

436 bucket.collections().drop_collection(table_name) 

437 return Response(resp_type=RESPONSE_TYPE.OK) 

438 except Exception as e: 

439 return Response(resp_type=RESPONSE_TYPE.ERROR, error_message=e) 

440 

441 def get_tables(self) -> Response: 

442 """ 

443 Get the list of collections in the Couchbase bucket. 

444 """ 

445 cluster = self.connect() 

446 bucket = cluster.bucket(self.bucket_name) 

447 collections = bucket.collections().get_all_scopes() 

448 collection_names = [ 

449 coll.name for scope in collections for coll in scope.collections 

450 ] 

451 collections_df = pd.DataFrame( 

452 columns=["table_name"], data=collection_names 

453 ) 

454 return Response( 

455 resp_type=RESPONSE_TYPE.TABLE, data_frame=collections_df 

456 ) 

457 

458 def get_columns(self, table_name: str) -> Response: 

459 """ 

460 Get the columns (fields) of a Couchbase collection. 

461 """ 

462 try: 

463 cluster = self.connect() 

464 bucket = cluster.bucket(self.bucket_name) 

465 scope = bucket.scope(self.scope) 

466 _ = scope.collection(table_name) 

467 except Exception: 

468 return Response( 

469 resp_type=RESPONSE_TYPE.ERROR, 

470 error_message=f"Table {table_name} does not exist!", 

471 ) 

472 return super().get_columns(table_name)