Coverage for mindsdb / integrations / handlers / couchbase_handler / couchbase_handler.py: 0%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from datetime import timedelta 

2 

3import pandas as pd 

4from couchbase.auth import PasswordAuthenticator 

5from couchbase.cluster import Cluster 

6from couchbase.exceptions import UnAmbiguousTimeoutException 

7from couchbase.options import ClusterOptions 

8from couchbase.exceptions import KeyspaceNotFoundException, CouchbaseException 

9 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from mindsdb.utilities import log 

12from mindsdb_sql_parser.ast.base import ASTNode 

13from mindsdb.integrations.libs.response import ( 

14 HandlerStatusResponse as StatusResponse, 

15 HandlerResponse as Response, 

16 RESPONSE_TYPE, 

17) 

18 

19 

20logger = log.getLogger(__name__) 

21 

22 

23class CouchbaseHandler(DatabaseHandler): 

24 """ 

25 This handler handles connection and execution of the Couchbase statements. 

26 """ 

27 

28 name = "couchbase" 

29 DEFAULT_TIMEOUT_SECONDS = 5 

30 

31 def __init__(self, name, **kwargs): 

32 super().__init__(name) 

33 self.connection_data = kwargs.get("connection_data") 

34 

35 self.scope = self.connection_data.get("scope") or "_default" 

36 

37 self.bucket_name = self.connection_data.get("bucket") 

38 self.cluster = None 

39 

40 self.is_connected = False 

41 

42 def connect(self): 

43 """ 

44 Set up connections required by the handler. 

45 

46 Returns: 

47 The connected cluster. 

48 """ 

49 if self.is_connected: 

50 return self.cluster 

51 

52 auth = PasswordAuthenticator( 

53 self.connection_data.get("user"), 

54 self.connection_data.get("password"), 

55 # NOTE: If using SSL/TLS, add the certificate path. 

56 # We strongly reccomend this for production use. 

57 # cert_path=cert_path 

58 ) 

59 

60 options = ClusterOptions(auth) 

61 

62 conn_str = self.connection_data.get("connection_string") 

63 # wan_development is used to avoid latency issues while connecting to Couchbase over the internet 

64 options.apply_profile('wan_development') 

65 # connect to the cluster 

66 cluster = Cluster( 

67 conn_str, 

68 options, 

69 ) 

70 

71 try: 

72 # wait until the cluster is ready for use 

73 cluster.wait_until_ready(timedelta(seconds=self.DEFAULT_TIMEOUT_SECONDS)) 

74 self.is_connected = cluster.connected 

75 self.cluster = cluster 

76 except UnAmbiguousTimeoutException: 

77 self.is_connected = False 

78 raise 

79 

80 return self.cluster 

81 

82 def disconnect(self): 

83 """Close any existing connections 

84 Should switch self.is_connected. 

85 """ 

86 if self.is_connected is False: 

87 return 

88 self.is_connected = self.cluster.connected 

89 return 

90 

91 def check_connection(self) -> StatusResponse: 

92 """ 

93 Check the connection of the Couchbase bucket 

94 :return: success status and error message if error occurs 

95 """ 

96 result = StatusResponse(False) 

97 need_to_close = self.is_connected is False 

98 

99 try: 

100 cluster = self.connect() 

101 result.success = cluster.connected 

102 except UnAmbiguousTimeoutException as e: 

103 logger.error( 

104 f'Error connecting to Couchbase {self.connection_data["bucket"]}, {e}!' 

105 ) 

106 result.error_message = str(e) 

107 

108 if result.success is True and need_to_close: 

109 self.disconnect() 

110 if result.success is False and self.is_connected is True: 

111 self.is_connected = False 

112 return result 

113 

114 def native_query(self, query: str) -> Response: 

115 """Execute a raw query against Couchbase. 

116 

117 Args: 

118 query (str): Raw Couchbase query. 

119 

120 Returns: 

121 HandlerResponse containing query results. 

122 """ 

123 self.connect() 

124 bucket = self.cluster.bucket(self.bucket_name) 

125 cb = bucket.scope(self.scope) 

126 

127 data = {} 

128 try: 

129 for collection in cb.query(query): 

130 for collection_name, row in collection.items(): 

131 if isinstance(row, dict): 

132 for k, v in row.items(): 

133 data.setdefault(k, []).append(v) 

134 else: 

135 for k, v in collection.items(): 

136 data.setdefault(k, []).append(v) 

137 

138 response = Response( 

139 RESPONSE_TYPE.TABLE, pd.DataFrame(data) if data else RESPONSE_TYPE.OK 

140 ) 

141 except CouchbaseException as e: 

142 response = Response( 

143 RESPONSE_TYPE.ERROR, 

144 error_message=str(e.error_context.first_error_message), 

145 ) 

146 

147 return response 

148 

149 def query(self, query: ASTNode) -> Response: 

150 """Receive query as AST (abstract syntax tree) and act upon it somehow. 

151 Args: 

152 query (ASTNode): sql query represented as AST. May be any kind 

153 of query: SELECT, INTSERT, DELETE, etc 

154 Returns: 

155 HandlerResponse 

156 """ 

157 return self.native_query(query.to_string()) 

158 

159 def get_tables(self) -> Response: 

160 """ 

161 Get a list of collections in database 

162 """ 

163 cluster = self.connect() 

164 bucket = cluster.bucket(self.bucket_name) 

165 unique_collections = set() 

166 for scope in bucket.collections().get_all_scopes(): 

167 for collection in scope.collections: 

168 unique_collections.add(collection.name) 

169 collections = list(unique_collections) 

170 df = pd.DataFrame(collections, columns=["TABLE_NAME"]) 

171 response = Response(RESPONSE_TYPE.TABLE, df) 

172 

173 return response 

174 

175 def get_columns(self, table_name) -> Response: 

176 """Returns a list of entity columns 

177 Args: 

178 table_name (str): name of one of tables returned by self.get_tables() 

179 Returns: 

180 HandlerResponse: shoud have same columns as information_schema.columns 

181 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-columns-table.html) 

182 Column 'COLUMN_NAME' is mandatory, other is optional. Hightly 

183 recomended to define also 'DATA_TYPE': it should be one of 

184 python data types (by default it str). 

185 """ 

186 

187 response = Response(False) 

188 

189 cluster = self.connect() 

190 bucket = cluster.bucket(self.bucket_name) 

191 cb = bucket.scope(self.scope) 

192 

193 try: 

194 q = f"SELECT * FROM `{table_name}` limit 1" 

195 row_iter = cb.query(q) 

196 data = [] 

197 for row in row_iter: 

198 for k, v in row[table_name].items(): 

199 data.append([k, type(v).__name__]) 

200 df = pd.DataFrame(data, columns=["Field", "Type"]) 

201 response = Response(RESPONSE_TYPE.TABLE, df) 

202 except KeyspaceNotFoundException as e: 

203 response = Response( 

204 RESPONSE_TYPE.ERROR, 

205 error_message=f"Error: {e.error_context.first_error_message}", 

206 ) 

207 

208 return response