Coverage for mindsdb / integrations / handlers / couchbase_handler / couchbase_handler.py: 0%
104 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from datetime import timedelta
3import pandas as pd
4from couchbase.auth import PasswordAuthenticator
5from couchbase.cluster import Cluster
6from couchbase.exceptions import UnAmbiguousTimeoutException
7from couchbase.options import ClusterOptions
8from couchbase.exceptions import KeyspaceNotFoundException, CouchbaseException
10from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb.utilities import log
12from mindsdb_sql_parser.ast.base import ASTNode
13from mindsdb.integrations.libs.response import (
14 HandlerStatusResponse as StatusResponse,
15 HandlerResponse as Response,
16 RESPONSE_TYPE,
17)
20logger = log.getLogger(__name__)
23class CouchbaseHandler(DatabaseHandler):
24 """
25 This handler handles connection and execution of the Couchbase statements.
26 """
28 name = "couchbase"
29 DEFAULT_TIMEOUT_SECONDS = 5
31 def __init__(self, name, **kwargs):
32 super().__init__(name)
33 self.connection_data = kwargs.get("connection_data")
35 self.scope = self.connection_data.get("scope") or "_default"
37 self.bucket_name = self.connection_data.get("bucket")
38 self.cluster = None
40 self.is_connected = False
42 def connect(self):
43 """
44 Set up connections required by the handler.
46 Returns:
47 The connected cluster.
48 """
49 if self.is_connected:
50 return self.cluster
52 auth = PasswordAuthenticator(
53 self.connection_data.get("user"),
54 self.connection_data.get("password"),
55 # NOTE: If using SSL/TLS, add the certificate path.
56 # We strongly reccomend this for production use.
57 # cert_path=cert_path
58 )
60 options = ClusterOptions(auth)
62 conn_str = self.connection_data.get("connection_string")
63 # wan_development is used to avoid latency issues while connecting to Couchbase over the internet
64 options.apply_profile('wan_development')
65 # connect to the cluster
66 cluster = Cluster(
67 conn_str,
68 options,
69 )
71 try:
72 # wait until the cluster is ready for use
73 cluster.wait_until_ready(timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS))
74 self.is_connected = cluster.connected
75 self.cluster = cluster
76 except UnAmbiguousTimeoutException:
77 self.is_connected = False
78 raise
80 return self.cluster
82 def disconnect(self):
83 """Close any existing connections
84 Should switch self.is_connected.
85 """
86 if self.is_connected is False:
87 return
88 self.is_connected = self.cluster.connected
89 return
91 def check_connection(self) -> StatusResponse:
92 """
93 Check the connection of the Couchbase bucket
94 :return: success status and error message if error occurs
95 """
96 result = StatusResponse(False)
97 need_to_close = self.is_connected is False
99 try:
100 cluster = self.connect()
101 result.success = cluster.connected
102 except UnAmbiguousTimeoutException as e:
103 logger.error(
104 f'Error connecting to Couchbase {self.connection_data["bucket"]}, {e}!'
105 )
106 result.error_message = str(e)
108 if result.success is True and need_to_close:
109 self.disconnect()
110 if result.success is False and self.is_connected is True:
111 self.is_connected = False
112 return result
114 def native_query(self, query: str) -> Response:
115 """Execute a raw query against Couchbase.
117 Args:
118 query (str): Raw Couchbase query.
120 Returns:
121 HandlerResponse containing query results.
122 """
123 self.connect()
124 bucket = self.cluster.bucket(self.bucket_name)
125 cb = bucket.scope(self.scope)
127 data = {}
128 try:
129 for collection in cb.query(query):
130 for collection_name, row in collection.items():
131 if isinstance(row, dict):
132 for k, v in row.items():
133 data.setdefault(k, []).append(v)
134 else:
135 for k, v in collection.items():
136 data.setdefault(k, []).append(v)
138 response = Response(
139 RESPONSE_TYPE.TABLE, pd.DataFrame(data) if data else RESPONSE_TYPE.OK
140 )
141 except CouchbaseException as e:
142 response = Response(
143 RESPONSE_TYPE.ERROR,
144 error_message=str(e.error_context.first_error_message),
145 )
147 return response
149 def query(self, query: ASTNode) -> Response:
150 """Receive query as AST (abstract syntax tree) and act upon it somehow.
151 Args:
152 query (ASTNode): sql query represented as AST. May be any kind
153 of query: SELECT, INTSERT, DELETE, etc
154 Returns:
155 HandlerResponse
156 """
157 return self.native_query(query.to_string())
159 def get_tables(self) -> Response:
160 """
161 Get a list of collections in database
162 """
163 cluster = self.connect()
164 bucket = cluster.bucket(self.bucket_name)
165 unique_collections = set()
166 for scope in bucket.collections().get_all_scopes():
167 for collection in scope.collections:
168 unique_collections.add(collection.name)
169 collections = list(unique_collections)
170 df = pd.DataFrame(collections, columns=["TABLE_NAME"])
171 response = Response(RESPONSE_TYPE.TABLE, df)
173 return response
175 def get_columns(self, table_name) -> Response:
176 """Returns a list of entity columns
177 Args:
178 table_name (str): name of one of tables returned by self.get_tables()
179 Returns:
180 HandlerResponse: shoud have same columns as information_schema.columns
181 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-columns-table.html)
182 Column 'COLUMN_NAME' is mandatory, other is optional. Hightly
183 recomended to define also 'DATA_TYPE': it should be one of
184 python data types (by default it str).
185 """
187 response = Response(False)
189 cluster = self.connect()
190 bucket = cluster.bucket(self.bucket_name)
191 cb = bucket.scope(self.scope)
193 try:
194 q = f"SELECT * FROM `{table_name}` limit 1"
195 row_iter = cb.query(q)
196 data = []
197 for row in row_iter:
198 for k, v in row[table_name].items():
199 data.append([k, type(v).__name__])
200 df = pd.DataFrame(data, columns=["Field", "Type"])
201 response = Response(RESPONSE_TYPE.TABLE, df)
202 except KeyspaceNotFoundException as e:
203 response = Response(
204 RESPONSE_TYPE.ERROR,
205 error_message=f"Error: {e.error_context.first_error_message}",
206 )
208 return response