Coverage for mindsdb / integrations / handlers / surrealdb_handler / surrealdb_handler.py: 0%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2import pysurrealdb as surreal 

3import pandas as pd 

4 

5from mindsdb_sql_parser.ast.base import ASTNode 

6from mindsdb.integrations.libs.base import DatabaseHandler 

7from mindsdb.utilities import log 

8from mindsdb_sql_parser import parse_sql 

9from mindsdb.integrations.libs.response import ( 

10 HandlerStatusResponse as StatusResponse, 

11 HandlerResponse as Response, 

12 RESPONSE_TYPE 

13) 

14from .utils.surreal_get_info import table_names, column_info 

15 

16 

17logger = log.getLogger(__name__) 

18 

19 

20class SurrealDBHandler(DatabaseHandler): 

21 """ 

22 This handler handles connection and execution of the SurrealDB statements. 

23 """ 

24 name = 'surrealdb' 

25 

26 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

27 """ Initialize the handler 

28 Args: 

29 name (str): name of particular handler instance 

30 connection_data (dict): parameters for connecting to the database 

31 **kwargs: arbitrary keyword arguments. 

32 """ 

33 super().__init__(name) 

34 self.database = connection_data['database'] 

35 self.parser = parse_sql 

36 self.dialect = "surrealdb" 

37 self.kwargs = kwargs 

38 self.namespace = connection_data['namespace'] 

39 self.user = connection_data['user'] 

40 self.password = connection_data['password'] 

41 self.host = connection_data['host'] 

42 self.port = connection_data['port'] 

43 

44 self.connection = None 

45 self.is_connected = False 

46 

47 def connect(self): 

48 """ 

49 Establishes a connection to the MindsDB database. 

50 Returns: 

51 HandlerStatusResponse 

52 """ 

53 if self.is_connected is True: 

54 return self.connection 

55 try: 

56 self.connection = surreal.connect( 

57 database=self.database, 

58 host=self.host, 

59 port=self.port, 

60 user=self.user, 

61 password=self.password, 

62 namespace=self.namespace, 

63 ) 

64 self.is_connected = True 

65 except Exception as e: 

66 logger.error(f"Error while connecting to SurrealDB, {e}") 

67 

68 return self.connection 

69 

70 def check_connection(self) -> StatusResponse: 

71 """ 

72 Check connection to the handler. 

73 Returns: 

74 HandlerStatusResponse 

75 """ 

76 response_code = StatusResponse(False) 

77 need_to_close = self.is_connected is False 

78 try: 

79 self.connect() 

80 response_code.success = True 

81 except Exception as e: 

82 logger.error(f'Error connecting to SurrealDB, {e}!') 

83 response_code.error_message = str(e) 

84 finally: 

85 if response_code.success is True and need_to_close: 

86 self.disconnect() 

87 if response_code.success is False and self.is_connected is True: 

88 self.is_connected = False 

89 

90 return response_code 

91 

92 def disconnect(self): 

93 """ 

94 Close the existing connection to the SurrealDB database 

95 """ 

96 if self.is_connected is False: 

97 return 

98 try: 

99 self.connection.close() 

100 self.is_connected = False 

101 except Exception as e: 

102 logger.error(f"Error while disconnecting to SurrealDB, {e}") 

103 

104 return 

105 

106 def native_query(self, query: str) -> Response: 

107 """ 

108 Receive raw query and act upon it somehow. 

109 Args: 

110 query (Any): query in SurrealQL to execute 

111 Returns: 

112 HandlerResponse 

113 """ 

114 need_to_close = self.is_connected is False 

115 conn = self.connect() 

116 cur = conn.cursor() 

117 try: 

118 cur.execute(query) 

119 result = cur.fetchall() 

120 if result: 

121 response = Response( 

122 RESPONSE_TYPE.TABLE, 

123 data_frame=pd.DataFrame( 

124 result, 

125 columns=[x[0] for x in cur.description], 

126 ) 

127 ) 

128 else: 

129 response = Response(RESPONSE_TYPE.OK) 

130 except Exception as e: 

131 logger.error(f'Error running query: {query} on SurrealDB!') 

132 response = Response( 

133 RESPONSE_TYPE.ERROR, 

134 error_message=str(e) 

135 ) 

136 

137 cur.close() 

138 

139 if need_to_close is True: 

140 self.disconnect() 

141 

142 return response 

143 

144 def query(self, query: ASTNode) -> Response: 

145 """ 

146 Receive query as AST (abstract syntax tree) and act upon it somehow. 

147 Args: 

148 query (ASTNode): sql query represented as AST. It may be any kind 

149 of query: SELECT, INSERT, DELETE, etc 

150 Returns: 

151 HandlerResponse 

152 """ 

153 query_string = query.to_string() 

154 

155 # ensure the correct query is passed 

156 last_word = query_string.split()[-1] 

157 query_string = query_string.replace(last_word + '.', "") 

158 return self.native_query(query_string) 

159 

160 def get_tables(self) -> Response: 

161 """ 

162 Get list of tables from the database that will be accessible. 

163 Returns: 

164 HandlerResponse 

165 """ 

166 conn = self.connect() 

167 # get table names 

168 tables = table_names(conn) 

169 

170 # construct pandas dataframe 

171 df = pd.DataFrame(tables, columns=['table_name']) 

172 

173 response = Response( 

174 RESPONSE_TYPE.TABLE, df 

175 ) 

176 return response 

177 

178 def get_columns(self, table: str) -> Response: 

179 """ Return list of columns in table 

180 Args: 

181 table (str): name of the table to get column names and types from. 

182 Returns: 

183 HandlerResponse 

184 """ 

185 conn = self.connect() 

186 # get name and type of each column in the table 

187 columns, types = column_info(conn, table) 

188 

189 # construct pandas dataframe 

190 df = pd.DataFrame(columns, columns=['table_name']) 

191 df['data_type'] = types 

192 

193 response = Response( 

194 RESPONSE_TYPE.TABLE, df 

195 ) 

196 return response