Coverage for mindsdb / integrations / handlers / trino_handler / tests / test_trino_handler.py: 0%
22 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import unittest
3from mindsdb.api.mysql.mysql_proxy.mysql_proxy import RESPONSE_TYPE
4from mindsdb.integrations.handlers.trino_handler.trino_handler import TrinoHandler
7class TrinoHandlerTest(unittest.TestCase):
9 @classmethod
10 def setUpClass(cls):
11 cls.kwargs = {
12 "connection_data": {
13 "host": "qa.analytics.quantum.site.gs.com",
14 "port": "8090",
15 "user": "dqsvcuat",
16 "password": "",
17 "catalog": "gsam_dev2imddata_elastic",
18 "schema": "default",
19 "service_name": "HTTP/qa.analytics.quantum.site.gs.com",
20 "config_file_name": "test_trino_config.ini"
21 }
22 }
23 cls.handler = TrinoHandler('test_trino_handler', **cls.kwargs)
25 def test_0_canary(self):
26 print('Running canary test')
27 assert True
28 print('Canary test ran successfully')
30 def test_1_check_connection(self):
31 conn_status = self.handler.check_connection()
32 print('Trino connection status: ', conn_status)
33 assert conn_status.get('success')
35 def test_2_get_tables(self):
36 tables = self.handler.get_tables()
37 assert tables
39 def test_3_describe_table(self):
40 described = self.handler.get_columns("axioma_att_2021-12")
41 assert described['type'] is not RESPONSE_TYPE.ERROR
43 # TODO: complete tests implementation
44 # def test_4_select_query(self):
45 # query = "SELECT * FROM data.test_mdb WHERE 'id'='1'"
46 # result = self.handler.query(query)
47 # assert result['type'] is RESPONSE_TYPE.TABLE
48 #