Coverage for mindsdb / integrations / handlers / sqreamdb_handler / sqreamdb_handler.py: 0%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3from mindsdb_sql_parser.ast.base import ASTNode 

4 

5from mindsdb.integrations.libs.base import DatabaseHandler 

6from mindsdb.utilities import log 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb.integrations.libs.response import ( 

9 HandlerStatusResponse as StatusResponse, 

10 HandlerResponse as Response, 

11 RESPONSE_TYPE 

12) 

13 

14import pandas as pd 

15import pysqream as db 

16 

17from pysqream_sqlalchemy.dialect import SqreamDialect 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class SQreamDBHandler(DatabaseHandler): 

23 

24 name = 'sqreamdb' 

25 

26 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

27 """ Initialize the handler 

28 Args: 

29 name (str): name of particular handler instance 

30 connection_data (dict): parameters for connecting to the database 

31 **kwargs: arbitrary keyword arguments. 

32 """ 

33 super().__init__(name) 

34 

35 self.connection_data = connection_data 

36 

37 self.connection = None 

38 self.is_connected = False 

39 

40 def connect(self): 

41 """ 

42 Handles the connection to a YugabyteSQL database insance. 

43 """ 

44 if self.is_connected is True: 

45 return self.connection 

46 

47 args = { 

48 "database": self.connection_data.get('database'), 

49 "host": self.connection_data.get('host'), 

50 "port": self.connection_data.get('port'), 

51 "username": self.connection_data.get('user'), 

52 "password": self.connection_data.get('password'), 

53 "clustered": self.connection_data.get('clustered', False), 

54 "use_ssl": self.connection_data.get('use_ssl', False), 

55 "service": self.connection_data.get('service', 'sqream') 

56 } 

57 

58 connection = db.connect(**args) 

59 

60 self.is_connected = True 

61 self.connection = connection 

62 return self.connection 

63 

64 def check_connection(self) -> StatusResponse: 

65 """ 

66 Check the connection of the SQreamDB database 

67 :return: success status and error message if error occurs 

68 """ 

69 response = StatusResponse(False) 

70 need_to_close = self.is_connected is False 

71 

72 try: 

73 connection = self.connect() 

74 with connection.cursor() as cur: 

75 cur.execute('select 1;') 

76 response.success = True 

77 except db.Error as e: 

78 logger.error(f'Error connecting to SQreamDB {self.database}, {e}!') 

79 response.error_message = e 

80 

81 if response.success is True and need_to_close: 

82 self.disconnect() 

83 if response.success is False and self.is_connected is True: 

84 self.is_connected = False 

85 

86 return response 

87 

88 def native_query(self, query: str) -> StatusResponse: 

89 """Receive raw query and act upon it somehow. 

90 Args: 

91 query (Any): query in native format (str for sql databases, 

92 dict for mongo, etc) 

93 Returns: 

94 HandlerResponse 

95 """ 

96 need_to_close = self.is_connected is False 

97 conn = self.connect() 

98 with conn.cursor() as cur: 

99 try: 

100 cur.execute(query) 

101 

102 if cur.rowcount > 0 and query.upper().startswith('SELECT'): 

103 result = cur.fetchall() 

104 response = Response( 

105 RESPONSE_TYPE.TABLE, 

106 data_frame=pd.DataFrame( 

107 result, 

108 columns=[x[0] for x in cur.description] 

109 ) 

110 ) 

111 else: 

112 response = Response(RESPONSE_TYPE.OK) 

113 self.connection.commit() 

114 except Exception as e: 

115 logger.error(f'Error running query: {query} on {self.database}!') 

116 response = Response( 

117 RESPONSE_TYPE.ERROR, 

118 error_message=str(e) 

119 ) 

120 self.connection.rollback() 

121 

122 if need_to_close is True: 

123 self.disconnect() 

124 

125 return response 

126 

127 def query(self, query: ASTNode) -> Response: 

128 """ 

129 Retrieve the data from the SQL statement 

130 """ 

131 renderer = SqlalchemyRender(SqreamDialect) 

132 query_str = renderer.get_string(query, with_failback=True) 

133 return self.native_query(query_str) 

134 

135 def get_tables(self) -> Response: 

136 """ 

137 List all tables in SQreamDB stored in 'sqream_catalog' 

138 """ 

139 

140 query = "SELECT table_name FROM sqream_catalog.tables" 

141 

142 return self.query(query) 

143 

144 def get_columns(self, table_name): 

145 query = f"""SELECT column_name, type_name 

146 FROM sqream_catalog.columns 

147 WHERE table_name = '{table_name}'; 

148 """ 

149 return self.query(query)