Coverage for mindsdb / integrations / handlers / sqlite_handler / sqlite_handler.py: 0%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2from typing import Optional 

3 

4import pandas as pd 

5import sqlite3 

6 

7from mindsdb_sql_parser import parse_sql 

8from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

9from mindsdb.integrations.libs.base import DatabaseHandler 

10 

11from mindsdb_sql_parser.ast.base import ASTNode 

12 

13from mindsdb.utilities import log 

14from mindsdb.integrations.libs.response import ( 

15 HandlerStatusResponse as StatusResponse, 

16 HandlerResponse as Response, 

17 RESPONSE_TYPE, 

18) 

19 

20 

21logger = log.getLogger(__name__) 

22 

23 

24class SQLiteHandler(DatabaseHandler): 

25 """ 

26 This handler handles connection and execution of the SQLite statements. 

27 """ 

28 

29 name = "sqlite" 

30 

31 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

32 """ 

33 Initialize the handler. 

34 Args: 

35 name (str): name of particular handler instance 

36 connection_data (dict): parameters for connecting to the database 

37 **kwargs: arbitrary keyword arguments. 

38 """ 

39 super().__init__(name) 

40 self.parser = parse_sql 

41 self.dialect = "sqlite" 

42 self.connection_data = connection_data 

43 self.kwargs = kwargs 

44 

45 # SQLite objects created in a thread can only be used in that same thread. 

46 self.thread_safe = False 

47 

48 self.connection = None 

49 self.is_connected = False 

50 

51 def __del__(self): 

52 if self.is_connected is True: 

53 self.disconnect() 

54 

55 def connect(self) -> StatusResponse: 

56 """ 

57 Set up the connection required by the handler. 

58 Returns: 

59 HandlerStatusResponse 

60 """ 

61 

62 if self.is_connected is True: 

63 return self.connection 

64 

65 self.connection = sqlite3.connect(self.connection_data["db_file"]) 

66 self.is_connected = True 

67 

68 return self.connection 

69 

70 def disconnect(self): 

71 """ 

72 Close any existing connections. 

73 """ 

74 

75 if self.is_connected is False: 

76 return 

77 

78 self.connection.close() 

79 self.is_connected = False 

80 return self.is_connected 

81 

82 def check_connection(self) -> StatusResponse: 

83 """ 

84 Check connection to the handler. 

85 Returns: 

86 HandlerStatusResponse 

87 """ 

88 

89 response = StatusResponse(False) 

90 need_to_close = self.is_connected is False 

91 

92 try: 

93 if not os.path.isfile(self.connection_data["db_file"]): 

94 raise FileNotFoundError( 

95 f"File '{self.connection_data['db_file']}' not found. Use ':memory:' to create an in-memory database if you don't have a file." 

96 ) 

97 self.connect() 

98 response.success = True 

99 except Exception as e: 

100 logger.error(f"Error connecting to SQLite {self.connection_data['db_file']}, {e}!") 

101 response.error_message = str(e) 

102 finally: 

103 if response.success is True and need_to_close: 

104 self.disconnect() 

105 if response.success is False and self.is_connected is True: 

106 self.is_connected = False 

107 

108 return response 

109 

110 def native_query(self, query: str) -> StatusResponse: 

111 """ 

112 Receive raw query and act upon it somehow. 

113 Args: 

114 query (str): query in native format 

115 Returns: 

116 HandlerResponse 

117 """ 

118 

119 need_to_close = self.is_connected is False 

120 

121 connection = self.connect() 

122 cursor = connection.cursor() 

123 

124 try: 

125 cursor.execute(query) 

126 result = cursor.fetchall() 

127 if result: 

128 response = Response( 

129 RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(result, columns=[x[0] for x in cursor.description]) 

130 ) 

131 else: 

132 connection.commit() 

133 response = Response(RESPONSE_TYPE.OK) 

134 except Exception as e: 

135 logger.error(f"Error running query: {query} on {self.connection_data['db_file']}!") 

136 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

137 

138 cursor.close() 

139 if need_to_close is True: 

140 self.disconnect() 

141 

142 return response 

143 

144 def query(self, query: ASTNode) -> StatusResponse: 

145 """ 

146 Receive query as AST (abstract syntax tree) and act upon it somehow. 

147 Args: 

148 query (ASTNode): sql query represented as AST. May be any kind 

149 of query: SELECT, INTSERT, DELETE, etc 

150 Returns: 

151 HandlerResponse 

152 """ 

153 renderer = SqlalchemyRender("sqlite") 

154 query_str = renderer.get_string(query, with_failback=True) 

155 return self.native_query(query_str) 

156 

157 def get_tables(self) -> StatusResponse: 

158 """ 

159 Return list of entities that will be accessible as tables. 

160 Returns: 

161 HandlerResponse 

162 """ 

163 

164 query = "SELECT name from sqlite_master where type= 'table';" 

165 result = self.native_query(query) 

166 df = result.data_frame 

167 result.data_frame = df.rename(columns={df.columns[0]: "table_name"}) 

168 return result 

169 

170 def get_columns(self, table_name: str) -> StatusResponse: 

171 """ 

172 Returns a list of entity columns. 

173 Args: 

174 table_name (str): name of one of tables returned by self.get_tables() 

175 Returns: 

176 HandlerResponse 

177 """ 

178 

179 query = f"PRAGMA table_info([{table_name}]);" 

180 result = self.native_query(query) 

181 df = result.data_frame 

182 result.data_frame = df.rename(columns={"name": "column_name", "type": "data_type"}) 

183 return result