Coverage for mindsdb / integrations / handlers / sqlite_handler / sqlite_handler.py: 0%
87 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2from typing import Optional
4import pandas as pd
5import sqlite3
7from mindsdb_sql_parser import parse_sql
8from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
9from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb_sql_parser.ast.base import ASTNode
13from mindsdb.utilities import log
14from mindsdb.integrations.libs.response import (
15 HandlerStatusResponse as StatusResponse,
16 HandlerResponse as Response,
17 RESPONSE_TYPE,
18)
21logger = log.getLogger(__name__)
24class SQLiteHandler(DatabaseHandler):
25 """
26 This handler handles connection and execution of the SQLite statements.
27 """
29 name = "sqlite"
31 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
32 """
33 Initialize the handler.
34 Args:
35 name (str): name of particular handler instance
36 connection_data (dict): parameters for connecting to the database
37 **kwargs: arbitrary keyword arguments.
38 """
39 super().__init__(name)
40 self.parser = parse_sql
41 self.dialect = "sqlite"
42 self.connection_data = connection_data
43 self.kwargs = kwargs
45 # SQLite objects created in a thread can only be used in that same thread.
46 self.thread_safe = False
48 self.connection = None
49 self.is_connected = False
51 def __del__(self):
52 if self.is_connected is True:
53 self.disconnect()
55 def connect(self) -> StatusResponse:
56 """
57 Set up the connection required by the handler.
58 Returns:
59 HandlerStatusResponse
60 """
62 if self.is_connected is True:
63 return self.connection
65 self.connection = sqlite3.connect(self.connection_data["db_file"])
66 self.is_connected = True
68 return self.connection
70 def disconnect(self):
71 """
72 Close any existing connections.
73 """
75 if self.is_connected is False:
76 return
78 self.connection.close()
79 self.is_connected = False
80 return self.is_connected
82 def check_connection(self) -> StatusResponse:
83 """
84 Check connection to the handler.
85 Returns:
86 HandlerStatusResponse
87 """
89 response = StatusResponse(False)
90 need_to_close = self.is_connected is False
92 try:
93 if not os.path.isfile(self.connection_data["db_file"]):
94 raise FileNotFoundError(
95 f"File '{self.connection_data['db_file']}' not found. Use ':memory:' to create an in-memory database if you don't have a file."
96 )
97 self.connect()
98 response.success = True
99 except Exception as e:
100 logger.error(f"Error connecting to SQLite {self.connection_data['db_file']}, {e}!")
101 response.error_message = str(e)
102 finally:
103 if response.success is True and need_to_close:
104 self.disconnect()
105 if response.success is False and self.is_connected is True:
106 self.is_connected = False
108 return response
110 def native_query(self, query: str) -> StatusResponse:
111 """
112 Receive raw query and act upon it somehow.
113 Args:
114 query (str): query in native format
115 Returns:
116 HandlerResponse
117 """
119 need_to_close = self.is_connected is False
121 connection = self.connect()
122 cursor = connection.cursor()
124 try:
125 cursor.execute(query)
126 result = cursor.fetchall()
127 if result:
128 response = Response(
129 RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(result, columns=[x[0] for x in cursor.description])
130 )
131 else:
132 connection.commit()
133 response = Response(RESPONSE_TYPE.OK)
134 except Exception as e:
135 logger.error(f"Error running query: {query} on {self.connection_data['db_file']}!")
136 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
138 cursor.close()
139 if need_to_close is True:
140 self.disconnect()
142 return response
144 def query(self, query: ASTNode) -> StatusResponse:
145 """
146 Receive query as AST (abstract syntax tree) and act upon it somehow.
147 Args:
148 query (ASTNode): sql query represented as AST. May be any kind
149 of query: SELECT, INTSERT, DELETE, etc
150 Returns:
151 HandlerResponse
152 """
153 renderer = SqlalchemyRender("sqlite")
154 query_str = renderer.get_string(query, with_failback=True)
155 return self.native_query(query_str)
157 def get_tables(self) -> StatusResponse:
158 """
159 Return list of entities that will be accessible as tables.
160 Returns:
161 HandlerResponse
162 """
164 query = "SELECT name from sqlite_master where type= 'table';"
165 result = self.native_query(query)
166 df = result.data_frame
167 result.data_frame = df.rename(columns={df.columns[0]: "table_name"})
168 return result
170 def get_columns(self, table_name: str) -> StatusResponse:
171 """
172 Returns a list of entity columns.
173 Args:
174 table_name (str): name of one of tables returned by self.get_tables()
175 Returns:
176 HandlerResponse
177 """
179 query = f"PRAGMA table_info([{table_name}]);"
180 result = self.native_query(query)
181 df = result.data_frame
182 result.data_frame = df.rename(columns={"name": "column_name", "type": "data_type"})
183 return result