Coverage for mindsdb / integrations / handlers / cassandra_handler / tests / test_cassandra_handler.py: 0%

23 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import unittest 

2from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE 

3from mindsdb.integrations.handlers.cassandra_handler.cassandra_handler import CassandraHandler 

4 

5 

6class CassandraHandlerTest(unittest.TestCase): 

7 @classmethod 

8 def setUpClass(cls): 

9 cls.kwargs = { 

10 "connection_data": { 

11 "host": "127.0.0.1", 

12 "port": "9043", 

13 "user": "cassandra", 

14 "password": "", 

15 "keyspace": "test_data", 

16 "protocol_version": 4 

17 } 

18 } 

19 cls.handler = CassandraHandler('test_cassandra_handler', **cls.kwargs) 

20 

21 def test_0_connect(self): 

22 self.handler.check_connection() 

23 

24 def test_1_native_query_show_keyspaces(self): 

25 dbs = self.handler.native_query("DESC KEYSPACES;") 

26 assert dbs.type is not RESPONSE_TYPE.ERROR 

27 

28 def test_2_get_tables(self): 

29 tbls = self.handler.get_tables() 

30 assert tbls.type is not RESPONSE_TYPE.ERROR 

31 

32 def test_3_describe_table(self): 

33 described = self.handler.get_columns("home_rentals") 

34 assert described.type is RESPONSE_TYPE.TABLE 

35 

36 def test_4_select_query(self): 

37 query = "SELECT * FROM home_rentals WHERE 'id'='3712'" 

38 result = self.handler.query(query) 

39 assert result.type is RESPONSE_TYPE.TABLE