Coverage for mindsdb / integrations / handlers / lancedb_handler / lancedb_handler.py: 0%
162 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import List, Optional
3import lancedb
4import pandas as pd
5import pyarrow as pa
6from lance.vector import vec_to_table
7import duckdb
8import json
10from mindsdb.integrations.libs.response import RESPONSE_TYPE
11from mindsdb.integrations.libs.response import HandlerResponse
12from mindsdb.integrations.libs.response import HandlerResponse as Response
13from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse
14from mindsdb.integrations.libs.vectordatabase_handler import (
15 FilterCondition,
16 FilterOperator,
17 TableField,
18 VectorStoreHandler,
19)
20from mindsdb.utilities import log
22logger = log.getLogger(__name__)
25class LanceDBHandler(VectorStoreHandler):
26 """This handler handles connection and execution of the LanceDB statements."""
28 name = "lancedb"
30 def __init__(self, name: str, **kwargs):
31 super().__init__(name)
32 self._connection_data = kwargs.get("connection_data")
34 self._client_config = {
35 "uri": self._connection_data.get("persist_directory"),
36 "api_key": self._connection_data.get("api_key", None),
37 "region": self._connection_data.get("region"),
38 "host_override": self._connection_data.get("host_override"),
39 }
41 # uri is required either for LanceDB Cloud or local
42 if not self._client_config["uri"]:
43 raise Exception(
44 "persist_directory is required for LanceDB connection!"
45 )
46 # uri, api_key and region is required either for LanceDB Cloud
47 elif self._client_config["uri"] and self._client_config["api_key"] and not self._client_config["region"]:
48 raise Exception(
49 "region is required for LanceDB Cloud connection!"
50 )
52 self._client = None
53 self.is_connected = False
54 self.connect()
56 def _get_client(self):
57 client_config = self._client_config
58 if client_config is None:
59 raise Exception("Client config is not set!")
60 return lancedb.connect(**client_config)
62 def __del__(self):
63 if self.is_connected is True:
64 self.disconnect()
66 def connect(self):
67 """Connect to a LanceDB database."""
68 if self.is_connected is True:
69 return
70 try:
71 self._client = self._get_client()
72 self.is_connected = True
73 except Exception as e:
74 logger.error(f"Error connecting to LanceDB client, {e}!")
75 self.is_connected = False
77 def disconnect(self):
78 """Close the database connection."""
79 if self.is_connected is False:
80 return
81 self._client = None
82 self.is_connected = False
84 def check_connection(self):
85 """Check the connection to the LanceDB database."""
86 response_code = StatusResponse(False)
87 need_to_close = self.is_connected is False
88 try:
89 self._client.table_names()
90 response_code.success = True
91 except Exception as e:
92 logger.error(f"Error connecting to LanceDB , {e}!")
93 response_code.error_message = str(e)
94 finally:
95 if response_code.success is True and need_to_close:
96 self.disconnect()
97 if response_code.success is False and self.is_connected is True:
98 self.is_connected = False
100 return response_code
102 def _get_lancedb_operator(self, operator: FilterOperator) -> str:
103 # The in values are not returned with () and only one element is returned. Bug
104 mapping = {
105 FilterOperator.EQUAL: "=",
106 FilterOperator.NOT_EQUAL: "!=",
107 FilterOperator.LESS_THAN: "<",
108 FilterOperator.LESS_THAN_OR_EQUAL: "<=",
109 FilterOperator.GREATER_THAN: ">",
110 FilterOperator.GREATER_THAN_OR_EQUAL: ">=",
111 FilterOperator.IN: "in",
112 FilterOperator.NOT_IN: "not in",
113 FilterOperator.LIKE: "like",
114 FilterOperator.NOT_LIKE: "not like",
115 FilterOperator.IS_NULL: "is null",
116 FilterOperator.IS_NOT_NULL: "is not null",
117 }
119 if operator not in mapping:
120 raise Exception(f"Operator {operator} is not supported by LanceDB!")
122 return mapping[operator]
124 def _translate_condition(
125 self, conditions: List[FilterCondition]
126 ) -> Optional[dict]:
127 """
128 Translate a list of FilterCondition objects to string that can be used by LanceDB.
129 E.g.,
130 [
131 FilterCondition(
132 column="content",
133 op=FilterOperator.NOT_EQUAL,
134 value="a",
135 ),
136 FilterCondition(
137 column="id",
138 op=FilterOperator.EQUAL,
139 value="6",
140 )
141 ]
142 -->
143 "content != 'a' and id = '6'"
144 """
145 # we ignore all non-metadata conditions
146 if not conditions:
147 return
148 filtered_conditions = [
149 condition
150 for condition in conditions
151 if condition.column.startswith(TableField.ID.value) or condition.column.startswith(TableField.CONTENT.value)
152 ]
154 if len(filtered_conditions) == 0:
155 return None
157 # generate the LanceDB filter string
158 lancedb_conditions = []
159 for condition in filtered_conditions:
160 if isinstance(condition.value, str):
161 condition.value = f"'{condition.value}'"
162 condition_key = condition.column.split(".")[-1]
164 value = condition.value
165 if condition.op in (FilterOperator.IN, FilterOperator.NOT_IN):
166 if not isinstance(condition.value, list):
167 value = [value]
168 value = '({})'.format(', '.join([repr(i) for i in value]))
169 else:
170 value = str(value)
171 lancedb_conditions.append(
172 ' '.join([condition_key, self._get_lancedb_operator(condition.op), value])
173 )
174 # Combine all conditions into a single string and return
175 return " and ".join(lancedb_conditions) if lancedb_conditions else None
177 def select(
178 self,
179 table_name: str,
180 columns: List[str] = None,
181 conditions: List[FilterCondition] = None,
182 offset: int = None,
183 limit: int = None,
184 ) -> pd.DataFrame:
186 collection = self._client.open_table(table_name)
188 filters = self._translate_condition(conditions)
189 # check if embedding vector filter is present
190 vector_filter = (
191 []
192 if conditions is None
193 else [
194 condition
195 for condition in conditions
196 if condition.column == TableField.SEARCH_VECTOR.value
197 ]
198 )
200 if len(vector_filter) > 0:
201 vector_filter = vector_filter[0]
202 else:
203 vector_filter = None
205 if vector_filter is not None:
206 vec = json.loads(vector_filter.value) if isinstance(vector_filter.value, str) else vector_filter.value
207 result = collection.search(vec).select(columns).to_pandas()
208 result = result.rename(columns={"_distance": TableField.DISTANCE.value})
209 else:
210 result = self._client.open_table(table_name).to_pandas()
212 new_columns = columns + [TableField.DISTANCE.value] if TableField.DISTANCE.value in result.columns else columns
214 col_str = ', '.join([col for col in new_columns if col in (TableField.ID.value, TableField.CONTENT.value, TableField.METADATA.value, TableField.EMBEDDINGS.value, TableField.DISTANCE.value)])
216 where_str = f'where {filters}' if filters else ''
217 # implementing limit and offset. Not supported natively in lancedb
218 if limit and offset:
219 sql = f"""select {col_str} from result {where_str} limit {limit} offset {offset}"""
220 elif limit and not offset:
221 sql = f"""select {col_str} from result {where_str} limit {limit}"""
222 elif offset and not limit:
223 sql = f"""select {col_str} from result {where_str} offset {offset}"""
224 else:
225 sql = f"""select {col_str} from result {where_str}"""
227 data_df = duckdb.query(sql).to_df()
228 return data_df
230 def insert(
231 self, table_name: str, data: pd.DataFrame, columns: List[str] = None
232 ):
233 """
234 Insert data into the LanceDB database.
235 In case of create table statements the there is a mismatch between the column types of the `data` pandas dataframe filled with data
236 and the empty base table column types which raises a pa.lib.ArrowNotImplementedError, in that case the base table is deleted (doesn't matter as it is empty)
237 and recreated with the right datatypes
238 """
240 if TableField.METADATA.value not in data.columns:
241 data[TableField.METADATA.value] = None
243 df = data[
244 [TableField.ID.value, TableField.CONTENT.value, TableField.METADATA.value, TableField.EMBEDDINGS.value]
245 ]
247 try:
248 collection = self._client.open_table(table_name)
249 pa_data = pa.Table.from_pandas(df, preserve_index=False)
250 vec_data = vec_to_table(df[TableField.EMBEDDINGS.value].values.tolist())
251 new_pa_data = pa_data.append_column("vector", vec_data["vector"])
252 collection.add(new_pa_data)
253 except pa.lib.ArrowNotImplementedError:
254 collection_df = collection.to_pandas()
255 column_dtypes = collection_df.dtypes
256 df = df.astype(column_dtypes)
257 new_df = pd.concat([collection_df, df])
258 new_df['id'] = new_df['id'].apply(str)
259 pa_data = pa.Table.from_pandas(new_df, preserve_index=False)
260 vec_data = vec_to_table(df[TableField.EMBEDDINGS.value].values.tolist())
261 new_pa_data = pa_data.append_column("vector", vec_data["vector"])
262 self.drop_table(table_name)
263 self._client.create_table(table_name, new_pa_data)
265 def update(
266 self, table_name: str, data: pd.DataFrame, columns: List[str] = None
267 ):
268 """
269 Update data in the LanceDB database.
270 TODO: not implemented yet
271 """
272 return super().update(table_name, data, columns)
274 def delete(
275 self, table_name: str, conditions: List[FilterCondition] = None
276 ):
277 filters = self._translate_condition(conditions)
278 if filters is None:
279 raise Exception("Delete query must have at least one condition!")
280 collection = self._client.open_table(table_name)
281 collection.delete(filters)
283 def create_table(self, table_name: str, if_not_exists=True):
284 """
285 Create a collection with the given name in the LanceDB database.
286 """
288 data = {
289 TableField.ID.value: str,
290 TableField.CONTENT.value: str,
291 TableField.METADATA.value: object,
292 TableField.EMBEDDINGS.value: object,
293 }
294 df = pd.DataFrame(columns=data.keys()).astype(data)
295 self._client.create_table(table_name, df)
297 def drop_table(self, table_name: str, if_exists=True):
298 """
299 Delete a collection from the LanceDB database.
300 """
301 try:
302 self._client.drop_table(table_name)
303 except ValueError as e:
304 if not if_exists:
305 raise e
307 def get_tables(self) -> HandlerResponse:
308 """
309 Get the list of collections in the LanceDB database.
310 """
311 collections = self._client.table_names()
312 collections_name = pd.DataFrame(
313 columns=["table_name"],
314 data=collections,
315 )
316 return Response(resp_type=RESPONSE_TYPE.TABLE, data_frame=collections_name)
318 def get_columns(self, table_name: str) -> HandlerResponse:
319 # check if collection exists
320 try:
321 df = self._client.open_table(table_name).to_pandas()
322 column_df = pd.DataFrame(df.dtypes).reset_index()
323 column_df.columns = ['column_name', 'data_type']
324 except ValueError:
325 return Response(
326 resp_type=RESPONSE_TYPE.ERROR,
327 error_message=f"Table {table_name} does not exist!",
328 )
329 return Response(resp_type=RESPONSE_TYPE.TABLE, data_frame=column_df)