Coverage for mindsdb / integrations / handlers / crate_handler / crate_handler.py: 0%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from collections import OrderedDict 

2from typing import Optional 

3from mindsdb_sql_parser.ast.base import ASTNode 

4from mindsdb.integrations.libs.base import DatabaseHandler 

5from mindsdb.utilities import log 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb.integrations.libs.response import ( 

9 HandlerStatusResponse as StatusResponse, 

10 HandlerResponse as Response, 

11 RESPONSE_TYPE, 

12) 

13from mindsdb.integrations.libs.const import ( 

14 HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE, 

15) 

16 

17 

18import pandas as pd 

19from crate import client as db 

20from sqlalchemy_cratedb import dialect 

21 

22logger = log.getLogger(__name__) 

23 

24 

25class CrateHandler(DatabaseHandler): 

26 name = "crate" 

27 

28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

29 """Initialize the handler 

30 Args: 

31 name (str): name of particular handler instance 

32 connection_data (dict): parameters for connecting to the database 

33 **kwargs: arbitrary keyword arguments. 

34 """ 

35 super().__init__(name) 

36 

37 self.kwargs = kwargs 

38 self.parser = parse_sql 

39 self.dialect = "crate" 

40 self.user = connection_data["user"] 

41 self.password = connection_data["password"] 

42 self.schemaName = connection_data.get("schema_name", "doc") 

43 self.host = connection_data["host"] 

44 self.port = connection_data["port"] 

45 

46 self.connection = None 

47 self.is_connected = False 

48 

49 def connect(self): 

50 """Set up any connections required by the handler 

51 Should return output of check_connection() method after attempting 

52 connection. Should switch self.is_connected. 

53 Returns: 

54 Connection Object 

55 """ 

56 if self.is_connected: 

57 return self.connection 

58 

59 is_local = self.host.startswith("localhost") or self.host == "127.0.0.1" 

60 

61 try: 

62 # Build URL based on connection type 

63 protocol = "http" if is_local else "https" 

64 url = f"{protocol}://{self.user}:{self.password}@{self.host}:{self.port}" 

65 

66 # Connect with appropriate settings based on connection type 

67 self.connection = db.connect( 

68 url, 

69 timeout=30, 

70 # Only verify SSL for cloud connections 

71 verify_ssl_cert=not is_local, 

72 ) 

73 

74 self.is_connected = True 

75 except Exception as e: 

76 logger.error(f"Error while connecting to CrateDB: {e}") 

77 

78 return self.connection 

79 

80 def disconnect(self): 

81 """Close any existing connections 

82 Should switch self.is_connected. 

83 """ 

84 

85 if self.is_connected is False: 

86 return 

87 try: 

88 self.connection.close() 

89 self.is_connected = False 

90 except Exception as e: 

91 logger.error(f"Error while disconnecting to CrateDB, {e}") 

92 

93 return 

94 

95 def check_connection(self) -> StatusResponse: 

96 """Check connection to the handler 

97 Returns: 

98 HandlerStatusResponse 

99 """ 

100 

101 responseCode = StatusResponse(False) 

102 need_to_close = self.is_connected is False 

103 

104 try: 

105 self.connect() 

106 responseCode.success = True 

107 except Exception as e: 

108 logger.error(f"Error connecting to CrateDB, {e}!") 

109 responseCode.error_message = str(e) 

110 finally: 

111 if responseCode.success is True and need_to_close: 

112 self.disconnect() 

113 if responseCode.success is False and self.is_connected is True: 

114 self.is_connected = False 

115 

116 return responseCode 

117 

118 def native_query(self, query: str) -> StatusResponse: 

119 """Receive raw query and act upon it somehow. 

120 Args: 

121 query (Any): query in native format (str for sql databases, 

122 etc) 

123 Returns: 

124 HandlerResponse 

125 """ 

126 

127 need_to_close = self.is_connected is False 

128 

129 conn = self.connect() 

130 cur = conn.cursor() 

131 try: 

132 cur.execute(query) 

133 if cur.rowcount: 

134 result = cur.fetchall() 

135 response = Response( 

136 RESPONSE_TYPE.TABLE, 

137 data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]), 

138 ) 

139 else: 

140 response = Response(RESPONSE_TYPE.OK) 

141 except Exception as e: 

142 logger.error(f"Error running query: {query} on CrateDB!") 

143 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

144 cur.close() 

145 

146 if need_to_close is True: 

147 self.disconnect() 

148 

149 return response 

150 

151 def query(self, query: ASTNode) -> StatusResponse: 

152 """Receive query as AST (abstract syntax tree) and act upon it somehow. 

153 Args: 

154 query (ASTNode): sql query represented as AST. May be any kind 

155 of query: SELECT, INTSERT, DELETE, etc 

156 Returns: 

157 HandlerResponse 

158 """ 

159 

160 renderer = SqlalchemyRender(dialect) 

161 query_str = renderer.get_string(query, with_failback=True) 

162 return self.native_query(query_str) 

163 

164 def get_tables(self) -> StatusResponse: 

165 """Return list of entities 

166 Return list of entities that will be accesible as tables. 

167 Returns: 

168 HandlerResponse: shoud have same columns as information_schema.tables 

169 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-tables-table.html) 

170 Column 'TABLE_NAME' is mandatory, other is optional. 

171 """ 

172 

173 q = f"SHOW TABLES FROM {self.schemaName};" 

174 result = self.native_query(q) 

175 return result 

176 

177 def get_columns(self, table_name: str) -> StatusResponse: 

178 """Returns a list of entity columns 

179 Args: 

180 table_name (str): name of one of tables returned by self.get_tables() 

181 Returns: 

182 HandlerResponse: shoud have same columns as information_schema.columns 

183 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-columns-table.html) 

184 Column 'COLUMN_NAME' is mandatory, other is optional. Hightly 

185 recomended to define also 'DATA_TYPE': it should be one of 

186 python data types (by default it str). 

187 """ 

188 

189 q = f"SHOW COLUMNS FROM {table_name};" 

190 result = self.native_query(q) 

191 return result 

192 

193 

194connection_args = OrderedDict( 

195 host={ 

196 "type": ARG_TYPE.STR, 

197 "description": "The host name or IP address of the CrateDB server/database.", 

198 }, 

199 user={ 

200 "type": ARG_TYPE.STR, 

201 "description": "The user name used to authenticate with the CrateDB server.", 

202 }, 

203 password={ 

204 "type": ARG_TYPE.STR, 

205 "description": "The password to authenticate the user with the CrateDB server.", 

206 }, 

207 port={ 

208 "type": ARG_TYPE.INT, 

209 "description": "Specify port to connect CrateDB server", 

210 }, 

211 schemaName={ 

212 "type": ARG_TYPE.STR, 

213 "description": 'Specify the schema name. Note: It is optional DEFAULT is "doc"', 

214 }, 

215) 

216 

217connection_args_example = OrderedDict( 

218 host="127.0.0.1", 

219 port="4200", 

220 password="", 

221 user="crate", 

222)