Coverage for mindsdb / integrations / handlers / crate_handler / crate_handler.py: 0%
93 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from collections import OrderedDict
2from typing import Optional
3from mindsdb_sql_parser.ast.base import ASTNode
4from mindsdb.integrations.libs.base import DatabaseHandler
5from mindsdb.utilities import log
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb.integrations.libs.response import (
9 HandlerStatusResponse as StatusResponse,
10 HandlerResponse as Response,
11 RESPONSE_TYPE,
12)
13from mindsdb.integrations.libs.const import (
14 HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE,
15)
18import pandas as pd
19from crate import client as db
20from sqlalchemy_cratedb import dialect
22logger = log.getLogger(__name__)
25class CrateHandler(DatabaseHandler):
26 name = "crate"
28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
29 """Initialize the handler
30 Args:
31 name (str): name of particular handler instance
32 connection_data (dict): parameters for connecting to the database
33 **kwargs: arbitrary keyword arguments.
34 """
35 super().__init__(name)
37 self.kwargs = kwargs
38 self.parser = parse_sql
39 self.dialect = "crate"
40 self.user = connection_data["user"]
41 self.password = connection_data["password"]
42 self.schemaName = connection_data.get("schema_name", "doc")
43 self.host = connection_data["host"]
44 self.port = connection_data["port"]
46 self.connection = None
47 self.is_connected = False
49 def connect(self):
50 """Set up any connections required by the handler
51 Should return output of check_connection() method after attempting
52 connection. Should switch self.is_connected.
53 Returns:
54 Connection Object
55 """
56 if self.is_connected:
57 return self.connection
59 is_local = self.host.startswith("localhost") or self.host == "127.0.0.1"
61 try:
62 # Build URL based on connection type
63 protocol = "http" if is_local else "https"
64 url = f"{protocol}://{self.user}:{self.password}@{self.host}:{self.port}"
66 # Connect with appropriate settings based on connection type
67 self.connection = db.connect(
68 url,
69 timeout=30,
70 # Only verify SSL for cloud connections
71 verify_ssl_cert=not is_local,
72 )
74 self.is_connected = True
75 except Exception as e:
76 logger.error(f"Error while connecting to CrateDB: {e}")
78 return self.connection
80 def disconnect(self):
81 """Close any existing connections
82 Should switch self.is_connected.
83 """
85 if self.is_connected is False:
86 return
87 try:
88 self.connection.close()
89 self.is_connected = False
90 except Exception as e:
91 logger.error(f"Error while disconnecting to CrateDB, {e}")
93 return
95 def check_connection(self) -> StatusResponse:
96 """Check connection to the handler
97 Returns:
98 HandlerStatusResponse
99 """
101 responseCode = StatusResponse(False)
102 need_to_close = self.is_connected is False
104 try:
105 self.connect()
106 responseCode.success = True
107 except Exception as e:
108 logger.error(f"Error connecting to CrateDB, {e}!")
109 responseCode.error_message = str(e)
110 finally:
111 if responseCode.success is True and need_to_close:
112 self.disconnect()
113 if responseCode.success is False and self.is_connected is True:
114 self.is_connected = False
116 return responseCode
118 def native_query(self, query: str) -> StatusResponse:
119 """Receive raw query and act upon it somehow.
120 Args:
121 query (Any): query in native format (str for sql databases,
122 etc)
123 Returns:
124 HandlerResponse
125 """
127 need_to_close = self.is_connected is False
129 conn = self.connect()
130 cur = conn.cursor()
131 try:
132 cur.execute(query)
133 if cur.rowcount:
134 result = cur.fetchall()
135 response = Response(
136 RESPONSE_TYPE.TABLE,
137 data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]),
138 )
139 else:
140 response = Response(RESPONSE_TYPE.OK)
141 except Exception as e:
142 logger.error(f"Error running query: {query} on CrateDB!")
143 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
144 cur.close()
146 if need_to_close is True:
147 self.disconnect()
149 return response
151 def query(self, query: ASTNode) -> StatusResponse:
152 """Receive query as AST (abstract syntax tree) and act upon it somehow.
153 Args:
154 query (ASTNode): sql query represented as AST. May be any kind
155 of query: SELECT, INTSERT, DELETE, etc
156 Returns:
157 HandlerResponse
158 """
160 renderer = SqlalchemyRender(dialect)
161 query_str = renderer.get_string(query, with_failback=True)
162 return self.native_query(query_str)
164 def get_tables(self) -> StatusResponse:
165 """Return list of entities
166 Return list of entities that will be accesible as tables.
167 Returns:
168 HandlerResponse: shoud have same columns as information_schema.tables
169 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-tables-table.html)
170 Column 'TABLE_NAME' is mandatory, other is optional.
171 """
173 q = f"SHOW TABLES FROM {self.schemaName};"
174 result = self.native_query(q)
175 return result
177 def get_columns(self, table_name: str) -> StatusResponse:
178 """Returns a list of entity columns
179 Args:
180 table_name (str): name of one of tables returned by self.get_tables()
181 Returns:
182 HandlerResponse: shoud have same columns as information_schema.columns
183 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-columns-table.html)
184 Column 'COLUMN_NAME' is mandatory, other is optional. Hightly
185 recomended to define also 'DATA_TYPE': it should be one of
186 python data types (by default it str).
187 """
189 q = f"SHOW COLUMNS FROM {table_name};"
190 result = self.native_query(q)
191 return result
194connection_args = OrderedDict(
195 host={
196 "type": ARG_TYPE.STR,
197 "description": "The host name or IP address of the CrateDB server/database.",
198 },
199 user={
200 "type": ARG_TYPE.STR,
201 "description": "The user name used to authenticate with the CrateDB server.",
202 },
203 password={
204 "type": ARG_TYPE.STR,
205 "description": "The password to authenticate the user with the CrateDB server.",
206 },
207 port={
208 "type": ARG_TYPE.INT,
209 "description": "Specify port to connect CrateDB server",
210 },
211 schemaName={
212 "type": ARG_TYPE.STR,
213 "description": 'Specify the schema name. Note: It is optional DEFAULT is "doc"',
214 },
215)
217connection_args_example = OrderedDict(
218 host="127.0.0.1",
219 port="4200",
220 password="",
221 user="crate",
222)