Coverage for mindsdb / integrations / handlers / sqreamdb_handler / sqreamdb_handler.py: 0%
69 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3from mindsdb_sql_parser.ast.base import ASTNode
5from mindsdb.integrations.libs.base import DatabaseHandler
6from mindsdb.utilities import log
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb.integrations.libs.response import (
9 HandlerStatusResponse as StatusResponse,
10 HandlerResponse as Response,
11 RESPONSE_TYPE
12)
14import pandas as pd
15import pysqream as db
17from pysqream_sqlalchemy.dialect import SqreamDialect
19logger = log.getLogger(__name__)
22class SQreamDBHandler(DatabaseHandler):
24 name = 'sqreamdb'
26 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
27 """ Initialize the handler
28 Args:
29 name (str): name of particular handler instance
30 connection_data (dict): parameters for connecting to the database
31 **kwargs: arbitrary keyword arguments.
32 """
33 super().__init__(name)
35 self.connection_data = connection_data
37 self.connection = None
38 self.is_connected = False
40 def connect(self):
41 """
42 Handles the connection to a YugabyteSQL database insance.
43 """
44 if self.is_connected is True:
45 return self.connection
47 args = {
48 "database": self.connection_data.get('database'),
49 "host": self.connection_data.get('host'),
50 "port": self.connection_data.get('port'),
51 "username": self.connection_data.get('user'),
52 "password": self.connection_data.get('password'),
53 "clustered": self.connection_data.get('clustered', False),
54 "use_ssl": self.connection_data.get('use_ssl', False),
55 "service": self.connection_data.get('service', 'sqream')
56 }
58 connection = db.connect(**args)
60 self.is_connected = True
61 self.connection = connection
62 return self.connection
64 def check_connection(self) -> StatusResponse:
65 """
66 Check the connection of the SQreamDB database
67 :return: success status and error message if error occurs
68 """
69 response = StatusResponse(False)
70 need_to_close = self.is_connected is False
72 try:
73 connection = self.connect()
74 with connection.cursor() as cur:
75 cur.execute('select 1;')
76 response.success = True
77 except db.Error as e:
78 logger.error(f'Error connecting to SQreamDB {self.database}, {e}!')
79 response.error_message = e
81 if response.success is True and need_to_close:
82 self.disconnect()
83 if response.success is False and self.is_connected is True:
84 self.is_connected = False
86 return response
88 def native_query(self, query: str) -> StatusResponse:
89 """Receive raw query and act upon it somehow.
90 Args:
91 query (Any): query in native format (str for sql databases,
92 dict for mongo, etc)
93 Returns:
94 HandlerResponse
95 """
96 need_to_close = self.is_connected is False
97 conn = self.connect()
98 with conn.cursor() as cur:
99 try:
100 cur.execute(query)
102 if cur.rowcount > 0 and query.upper().startswith('SELECT'):
103 result = cur.fetchall()
104 response = Response(
105 RESPONSE_TYPE.TABLE,
106 data_frame=pd.DataFrame(
107 result,
108 columns=[x[0] for x in cur.description]
109 )
110 )
111 else:
112 response = Response(RESPONSE_TYPE.OK)
113 self.connection.commit()
114 except Exception as e:
115 logger.error(f'Error running query: {query} on {self.database}!')
116 response = Response(
117 RESPONSE_TYPE.ERROR,
118 error_message=str(e)
119 )
120 self.connection.rollback()
122 if need_to_close is True:
123 self.disconnect()
125 return response
127 def query(self, query: ASTNode) -> Response:
128 """
129 Retrieve the data from the SQL statement
130 """
131 renderer = SqlalchemyRender(SqreamDialect)
132 query_str = renderer.get_string(query, with_failback=True)
133 return self.native_query(query_str)
135 def get_tables(self) -> Response:
136 """
137 List all tables in SQreamDB stored in 'sqream_catalog'
138 """
140 query = "SELECT table_name FROM sqream_catalog.tables"
142 return self.query(query)
144 def get_columns(self, table_name):
145 query = f"""SELECT column_name, type_name
146 FROM sqream_catalog.columns
147 WHERE table_name = '{table_name}';
148 """
149 return self.query(query)