Coverage for mindsdb / integrations / handlers / libsql_handler / libsql_handler.py: 0%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4import libsql_experimental as libsql 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.integrations.libs.base import DatabaseHandler 

8 

9from mindsdb_sql_parser.ast.base import ASTNode 

10 

11from mindsdb.utilities import log 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE, 

16) 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class LibSQLHandler(DatabaseHandler): 

23 """ 

24 This handler handles connection and execution of the LibSQL statements. 

25 """ 

26 

27 name = "libsql" 

28 

29 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

30 """ 

31 Initialize the handler. 

32 Args: 

33 name (str): name of particular handler instance 

34 connection_data (dict): parameters for connecting to the database 

35 **kwargs: arbitrary keyword arguments. 

36 """ 

37 super().__init__(name) 

38 self.parser = parse_sql 

39 self.dialect = "libsql" 

40 self.connection_data = connection_data 

41 self.kwargs = kwargs 

42 

43 self.connection = None 

44 self.is_connected = False 

45 

46 def __del__(self): 

47 if self.is_connected is True: 

48 self.disconnect() 

49 

50 def connect(self) -> StatusResponse: 

51 """ 

52 Set up the connection required by the handler. 

53 Returns: 

54 HandlerStatusResponse 

55 """ 

56 

57 if self.is_connected is True: 

58 return self.connection 

59 

60 args = self.connection_data 

61 # sync_url and auth_token are optional 

62 # sync_url is used to sync the local database from the remote database 

63 # auth_token is used as the authentication token for the remote database 

64 if args.get("sync_url"): 

65 self.connection = libsql.connect( 

66 database=args["database"], 

67 sync_url=args["sync_url"], 

68 auth_token=args["auth_token"], 

69 ) 

70 else: 

71 self.connection = libsql.connect(database=args["database"]) 

72 

73 self.is_connected = True 

74 

75 return self.connection 

76 

77 def disconnect(self): 

78 """ 

79 Close any existing connections. 

80 """ 

81 

82 if self.is_connected is False: 

83 return 

84 

85 self.connection = None 

86 self.is_connected = False 

87 return self.is_connected 

88 

89 def check_connection(self) -> StatusResponse: 

90 """ 

91 Check connection to the handler. 

92 Returns: 

93 HandlerStatusResponse 

94 """ 

95 

96 response = StatusResponse(False) 

97 need_to_close = self.is_connected is False 

98 

99 try: 

100 self.connect() 

101 response.success = True 

102 except Exception as e: 

103 logger.error( 

104 f'Error connecting to SQLite {self.connection_data["database"]}, {e}!' 

105 ) 

106 response.error_message = str(e) 

107 finally: 

108 if response.success is True and need_to_close: 

109 self.disconnect() 

110 if response.success is False and self.is_connected is True: 

111 self.is_connected = False 

112 

113 return response 

114 

115 def native_query(self, query: str) -> StatusResponse: 

116 """ 

117 Receive raw query and act upon it somehow. 

118 Args: 

119 query (str): query in native format 

120 Returns: 

121 HandlerResponse 

122 """ 

123 

124 need_to_close = self.is_connected is False 

125 

126 connection = self.connect() 

127 cursor = connection.cursor() 

128 

129 try: 

130 cursor.execute(query) 

131 result = cursor.fetchall() 

132 if result: 

133 response = Response( 

134 RESPONSE_TYPE.TABLE, 

135 data_frame=pd.DataFrame( 

136 result, columns=[x[0] for x in cursor.description] 

137 ), 

138 ) 

139 else: 

140 connection.commit() 

141 response = Response(RESPONSE_TYPE.OK) 

142 except Exception as e: 

143 logger.error( 

144 f'Error running query: {query} on {self.connection_data["database"]}!' 

145 ) 

146 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

147 

148 if need_to_close is True: 

149 self.disconnect() 

150 

151 return response 

152 

153 def query(self, query: ASTNode) -> StatusResponse: 

154 """ 

155 Receive query as AST (abstract syntax tree) and act upon it somehow. 

156 Args: 

157 query (ASTNode): sql query represented as AST. May be any kind 

158 of query: SELECT, INTSERT, DELETE, etc 

159 Returns: 

160 HandlerResponse 

161 """ 

162 return self.native_query(query) 

163 

164 def get_tables(self) -> StatusResponse: 

165 """ 

166 Return list of entities that will be accessible as tables. 

167 Returns: 

168 HandlerResponse 

169 """ 

170 

171 query = "SELECT name from sqlite_master where type= 'table';" 

172 result = self.native_query(query) 

173 df = result.data_frame 

174 result.data_frame = df.rename(columns={df.columns[0]: "table_name"}) 

175 return result 

176 

177 def get_columns(self, table_name: str) -> StatusResponse: 

178 """ 

179 Returns a list of entity columns. 

180 Args: 

181 table_name (str): name of one of tables returned by self.get_tables() 

182 Returns: 

183 HandlerResponse 

184 """ 

185 

186 query = f"PRAGMA table_info([{table_name}]);" 

187 result = self.native_query(query) 

188 df = result.data_frame 

189 result.data_frame = df.rename( 

190 columns={"name": "column_name", "type": "data_type"} 

191 ) 

192 return result