Coverage for mindsdb / integrations / handlers / libsql_handler / libsql_handler.py: 0%
82 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4import libsql_experimental as libsql
6from mindsdb_sql_parser import parse_sql
7from mindsdb.integrations.libs.base import DatabaseHandler
9from mindsdb_sql_parser.ast.base import ASTNode
11from mindsdb.utilities import log
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE,
16)
19logger = log.getLogger(__name__)
22class LibSQLHandler(DatabaseHandler):
23 """
24 This handler handles connection and execution of the LibSQL statements.
25 """
27 name = "libsql"
29 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
30 """
31 Initialize the handler.
32 Args:
33 name (str): name of particular handler instance
34 connection_data (dict): parameters for connecting to the database
35 **kwargs: arbitrary keyword arguments.
36 """
37 super().__init__(name)
38 self.parser = parse_sql
39 self.dialect = "libsql"
40 self.connection_data = connection_data
41 self.kwargs = kwargs
43 self.connection = None
44 self.is_connected = False
46 def __del__(self):
47 if self.is_connected is True:
48 self.disconnect()
50 def connect(self) -> StatusResponse:
51 """
52 Set up the connection required by the handler.
53 Returns:
54 HandlerStatusResponse
55 """
57 if self.is_connected is True:
58 return self.connection
60 args = self.connection_data
61 # sync_url and auth_token are optional
62 # sync_url is used to sync the local database from the remote database
63 # auth_token is used as the authentication token for the remote database
64 if args.get("sync_url"):
65 self.connection = libsql.connect(
66 database=args["database"],
67 sync_url=args["sync_url"],
68 auth_token=args["auth_token"],
69 )
70 else:
71 self.connection = libsql.connect(database=args["database"])
73 self.is_connected = True
75 return self.connection
77 def disconnect(self):
78 """
79 Close any existing connections.
80 """
82 if self.is_connected is False:
83 return
85 self.connection = None
86 self.is_connected = False
87 return self.is_connected
89 def check_connection(self) -> StatusResponse:
90 """
91 Check connection to the handler.
92 Returns:
93 HandlerStatusResponse
94 """
96 response = StatusResponse(False)
97 need_to_close = self.is_connected is False
99 try:
100 self.connect()
101 response.success = True
102 except Exception as e:
103 logger.error(
104 f'Error connecting to SQLite {self.connection_data["database"]}, {e}!'
105 )
106 response.error_message = str(e)
107 finally:
108 if response.success is True and need_to_close:
109 self.disconnect()
110 if response.success is False and self.is_connected is True:
111 self.is_connected = False
113 return response
115 def native_query(self, query: str) -> StatusResponse:
116 """
117 Receive raw query and act upon it somehow.
118 Args:
119 query (str): query in native format
120 Returns:
121 HandlerResponse
122 """
124 need_to_close = self.is_connected is False
126 connection = self.connect()
127 cursor = connection.cursor()
129 try:
130 cursor.execute(query)
131 result = cursor.fetchall()
132 if result:
133 response = Response(
134 RESPONSE_TYPE.TABLE,
135 data_frame=pd.DataFrame(
136 result, columns=[x[0] for x in cursor.description]
137 ),
138 )
139 else:
140 connection.commit()
141 response = Response(RESPONSE_TYPE.OK)
142 except Exception as e:
143 logger.error(
144 f'Error running query: {query} on {self.connection_data["database"]}!'
145 )
146 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
148 if need_to_close is True:
149 self.disconnect()
151 return response
153 def query(self, query: ASTNode) -> StatusResponse:
154 """
155 Receive query as AST (abstract syntax tree) and act upon it somehow.
156 Args:
157 query (ASTNode): sql query represented as AST. May be any kind
158 of query: SELECT, INTSERT, DELETE, etc
159 Returns:
160 HandlerResponse
161 """
162 return self.native_query(query)
164 def get_tables(self) -> StatusResponse:
165 """
166 Return list of entities that will be accessible as tables.
167 Returns:
168 HandlerResponse
169 """
171 query = "SELECT name from sqlite_master where type= 'table';"
172 result = self.native_query(query)
173 df = result.data_frame
174 result.data_frame = df.rename(columns={df.columns[0]: "table_name"})
175 return result
177 def get_columns(self, table_name: str) -> StatusResponse:
178 """
179 Returns a list of entity columns.
180 Args:
181 table_name (str): name of one of tables returned by self.get_tables()
182 Returns:
183 HandlerResponse
184 """
186 query = f"PRAGMA table_info([{table_name}]);"
187 result = self.native_query(query)
188 df = result.data_frame
189 result.data_frame = df.rename(
190 columns={"name": "column_name", "type": "data_type"}
191 )
192 return result