Coverage for mindsdb / integrations / handlers / lancedb_handler / lancedb_handler.py: 0%

162 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import List, Optional 

2 

3import lancedb 

4import pandas as pd 

5import pyarrow as pa 

6from lance.vector import vec_to_table 

7import duckdb 

8import json 

9 

10from mindsdb.integrations.libs.response import RESPONSE_TYPE 

11from mindsdb.integrations.libs.response import HandlerResponse 

12from mindsdb.integrations.libs.response import HandlerResponse as Response 

13from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse 

14from mindsdb.integrations.libs.vectordatabase_handler import ( 

15 FilterCondition, 

16 FilterOperator, 

17 TableField, 

18 VectorStoreHandler, 

19) 

20from mindsdb.utilities import log 

21 

22logger = log.getLogger(__name__) 

23 

24 

25class LanceDBHandler(VectorStoreHandler): 

26 """This handler handles connection and execution of the LanceDB statements.""" 

27 

28 name = "lancedb" 

29 

30 def __init__(self, name: str, **kwargs): 

31 super().__init__(name) 

32 self._connection_data = kwargs.get("connection_data") 

33 

34 self._client_config = { 

35 "uri": self._connection_data.get("persist_directory"), 

36 "api_key": self._connection_data.get("api_key", None), 

37 "region": self._connection_data.get("region"), 

38 "host_override": self._connection_data.get("host_override"), 

39 } 

40 

41 # uri is required either for LanceDB Cloud or local 

42 if not self._client_config["uri"]: 

43 raise Exception( 

44 "persist_directory is required for LanceDB connection!" 

45 ) 

46 # uri, api_key and region is required either for LanceDB Cloud 

47 elif self._client_config["uri"] and self._client_config["api_key"] and not self._client_config["region"]: 

48 raise Exception( 

49 "region is required for LanceDB Cloud connection!" 

50 ) 

51 

52 self._client = None 

53 self.is_connected = False 

54 self.connect() 

55 

56 def _get_client(self): 

57 client_config = self._client_config 

58 if client_config is None: 

59 raise Exception("Client config is not set!") 

60 return lancedb.connect(**client_config) 

61 

62 def __del__(self): 

63 if self.is_connected is True: 

64 self.disconnect() 

65 

66 def connect(self): 

67 """Connect to a LanceDB database.""" 

68 if self.is_connected is True: 

69 return 

70 try: 

71 self._client = self._get_client() 

72 self.is_connected = True 

73 except Exception as e: 

74 logger.error(f"Error connecting to LanceDB client, {e}!") 

75 self.is_connected = False 

76 

77 def disconnect(self): 

78 """Close the database connection.""" 

79 if self.is_connected is False: 

80 return 

81 self._client = None 

82 self.is_connected = False 

83 

84 def check_connection(self): 

85 """Check the connection to the LanceDB database.""" 

86 response_code = StatusResponse(False) 

87 need_to_close = self.is_connected is False 

88 try: 

89 self._client.table_names() 

90 response_code.success = True 

91 except Exception as e: 

92 logger.error(f"Error connecting to LanceDB , {e}!") 

93 response_code.error_message = str(e) 

94 finally: 

95 if response_code.success is True and need_to_close: 

96 self.disconnect() 

97 if response_code.success is False and self.is_connected is True: 

98 self.is_connected = False 

99 

100 return response_code 

101 

102 def _get_lancedb_operator(self, operator: FilterOperator) -> str: 

103 # The in values are not returned with () and only one element is returned. Bug 

104 mapping = { 

105 FilterOperator.EQUAL: "=", 

106 FilterOperator.NOT_EQUAL: "!=", 

107 FilterOperator.LESS_THAN: "<", 

108 FilterOperator.LESS_THAN_OR_EQUAL: "<=", 

109 FilterOperator.GREATER_THAN: ">", 

110 FilterOperator.GREATER_THAN_OR_EQUAL: ">=", 

111 FilterOperator.IN: "in", 

112 FilterOperator.NOT_IN: "not in", 

113 FilterOperator.LIKE: "like", 

114 FilterOperator.NOT_LIKE: "not like", 

115 FilterOperator.IS_NULL: "is null", 

116 FilterOperator.IS_NOT_NULL: "is not null", 

117 } 

118 

119 if operator not in mapping: 

120 raise Exception(f"Operator {operator} is not supported by LanceDB!") 

121 

122 return mapping[operator] 

123 

124 def _translate_condition( 

125 self, conditions: List[FilterCondition] 

126 ) -> Optional[dict]: 

127 """ 

128 Translate a list of FilterCondition objects to string that can be used by LanceDB. 

129 E.g., 

130 [ 

131 FilterCondition( 

132 column="content", 

133 op=FilterOperator.NOT_EQUAL, 

134 value="a", 

135 ), 

136 FilterCondition( 

137 column="id", 

138 op=FilterOperator.EQUAL, 

139 value="6", 

140 ) 

141 ] 

142 --> 

143 "content != 'a' and id = '6'" 

144 """ 

145 # we ignore all non-metadata conditions 

146 if not conditions: 

147 return 

148 filtered_conditions = [ 

149 condition 

150 for condition in conditions 

151 if condition.column.startswith(TableField.ID.value) or condition.column.startswith(TableField.CONTENT.value) 

152 ] 

153 

154 if len(filtered_conditions) == 0: 

155 return None 

156 

157 # generate the LanceDB filter string 

158 lancedb_conditions = [] 

159 for condition in filtered_conditions: 

160 if isinstance(condition.value, str): 

161 condition.value = f"'{condition.value}'" 

162 condition_key = condition.column.split(".")[-1] 

163 

164 value = condition.value 

165 if condition.op in (FilterOperator.IN, FilterOperator.NOT_IN): 

166 if not isinstance(condition.value, list): 

167 value = [value] 

168 value = '({})'.format(', '.join([repr(i) for i in value])) 

169 else: 

170 value = str(value) 

171 lancedb_conditions.append( 

172 ' '.join([condition_key, self._get_lancedb_operator(condition.op), value]) 

173 ) 

174 # Combine all conditions into a single string and return 

175 return " and ".join(lancedb_conditions) if lancedb_conditions else None 

176 

177 def select( 

178 self, 

179 table_name: str, 

180 columns: List[str] = None, 

181 conditions: List[FilterCondition] = None, 

182 offset: int = None, 

183 limit: int = None, 

184 ) -> pd.DataFrame: 

185 

186 collection = self._client.open_table(table_name) 

187 

188 filters = self._translate_condition(conditions) 

189 # check if embedding vector filter is present 

190 vector_filter = ( 

191 [] 

192 if conditions is None 

193 else [ 

194 condition 

195 for condition in conditions 

196 if condition.column == TableField.SEARCH_VECTOR.value 

197 ] 

198 ) 

199 

200 if len(vector_filter) > 0: 

201 vector_filter = vector_filter[0] 

202 else: 

203 vector_filter = None 

204 

205 if vector_filter is not None: 

206 vec = json.loads(vector_filter.value) if isinstance(vector_filter.value, str) else vector_filter.value 

207 result = collection.search(vec).select(columns).to_pandas() 

208 result = result.rename(columns={"_distance": TableField.DISTANCE.value}) 

209 else: 

210 result = self._client.open_table(table_name).to_pandas() 

211 

212 new_columns = columns + [TableField.DISTANCE.value] if TableField.DISTANCE.value in result.columns else columns 

213 

214 col_str = ', '.join([col for col in new_columns if col in (TableField.ID.value, TableField.CONTENT.value, TableField.METADATA.value, TableField.EMBEDDINGS.value, TableField.DISTANCE.value)]) 

215 

216 where_str = f'where {filters}' if filters else '' 

217 # implementing limit and offset. Not supported natively in lancedb 

218 if limit and offset: 

219 sql = f"""select {col_str} from result {where_str} limit {limit} offset {offset}""" 

220 elif limit and not offset: 

221 sql = f"""select {col_str} from result {where_str} limit {limit}""" 

222 elif offset and not limit: 

223 sql = f"""select {col_str} from result {where_str} offset {offset}""" 

224 else: 

225 sql = f"""select {col_str} from result {where_str}""" 

226 

227 data_df = duckdb.query(sql).to_df() 

228 return data_df 

229 

230 def insert( 

231 self, table_name: str, data: pd.DataFrame, columns: List[str] = None 

232 ): 

233 """ 

234 Insert data into the LanceDB database. 

235 In case of create table statements the there is a mismatch between the column types of the `data` pandas dataframe filled with data 

236 and the empty base table column types which raises a pa.lib.ArrowNotImplementedError, in that case the base table is deleted (doesn't matter as it is empty) 

237 and recreated with the right datatypes 

238 """ 

239 

240 if TableField.METADATA.value not in data.columns: 

241 data[TableField.METADATA.value] = None 

242 

243 df = data[ 

244 [TableField.ID.value, TableField.CONTENT.value, TableField.METADATA.value, TableField.EMBEDDINGS.value] 

245 ] 

246 

247 try: 

248 collection = self._client.open_table(table_name) 

249 pa_data = pa.Table.from_pandas(df, preserve_index=False) 

250 vec_data = vec_to_table(df[TableField.EMBEDDINGS.value].values.tolist()) 

251 new_pa_data = pa_data.append_column("vector", vec_data["vector"]) 

252 collection.add(new_pa_data) 

253 except pa.lib.ArrowNotImplementedError: 

254 collection_df = collection.to_pandas() 

255 column_dtypes = collection_df.dtypes 

256 df = df.astype(column_dtypes) 

257 new_df = pd.concat([collection_df, df]) 

258 new_df['id'] = new_df['id'].apply(str) 

259 pa_data = pa.Table.from_pandas(new_df, preserve_index=False) 

260 vec_data = vec_to_table(df[TableField.EMBEDDINGS.value].values.tolist()) 

261 new_pa_data = pa_data.append_column("vector", vec_data["vector"]) 

262 self.drop_table(table_name) 

263 self._client.create_table(table_name, new_pa_data) 

264 

265 def update( 

266 self, table_name: str, data: pd.DataFrame, columns: List[str] = None 

267 ): 

268 """ 

269 Update data in the LanceDB database. 

270 TODO: not implemented yet 

271 """ 

272 return super().update(table_name, data, columns) 

273 

274 def delete( 

275 self, table_name: str, conditions: List[FilterCondition] = None 

276 ): 

277 filters = self._translate_condition(conditions) 

278 if filters is None: 

279 raise Exception("Delete query must have at least one condition!") 

280 collection = self._client.open_table(table_name) 

281 collection.delete(filters) 

282 

283 def create_table(self, table_name: str, if_not_exists=True): 

284 """ 

285 Create a collection with the given name in the LanceDB database. 

286 """ 

287 

288 data = { 

289 TableField.ID.value: str, 

290 TableField.CONTENT.value: str, 

291 TableField.METADATA.value: object, 

292 TableField.EMBEDDINGS.value: object, 

293 } 

294 df = pd.DataFrame(columns=data.keys()).astype(data) 

295 self._client.create_table(table_name, df) 

296 

297 def drop_table(self, table_name: str, if_exists=True): 

298 """ 

299 Delete a collection from the LanceDB database. 

300 """ 

301 try: 

302 self._client.drop_table(table_name) 

303 except ValueError as e: 

304 if not if_exists: 

305 raise e 

306 

307 def get_tables(self) -> HandlerResponse: 

308 """ 

309 Get the list of collections in the LanceDB database. 

310 """ 

311 collections = self._client.table_names() 

312 collections_name = pd.DataFrame( 

313 columns=["table_name"], 

314 data=collections, 

315 ) 

316 return Response(resp_type=RESPONSE_TYPE.TABLE, data_frame=collections_name) 

317 

318 def get_columns(self, table_name: str) -> HandlerResponse: 

319 # check if collection exists 

320 try: 

321 df = self._client.open_table(table_name).to_pandas() 

322 column_df = pd.DataFrame(df.dtypes).reset_index() 

323 column_df.columns = ['column_name', 'data_type'] 

324 except ValueError: 

325 return Response( 

326 resp_type=RESPONSE_TYPE.ERROR, 

327 error_message=f"Table {table_name} does not exist!", 

328 ) 

329 return Response(resp_type=RESPONSE_TYPE.TABLE, data_frame=column_df)