Coverage for mindsdb / integrations / handlers / elasticsearch_handler / tests / test_elasticsearch_handler.py: 0%
22 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import unittest
2from mindsdb.integrations.handlers.elasticsearch_handler.elasticsearch_handler import (
3 ElasticsearchHandler,
4)
5from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
8class ElasticsearchHandlerTest(unittest.TestCase):
9 @classmethod
10 def setUpClass(cls):
11 cls.kwargs = {"hosts": "localhost:9200"}
12 cls.handler = ElasticsearchHandler("test_elasticsearch_handler", cls.kwargs)
14 def test_0_check_connection(self):
15 assert self.handler.check_connection()
17 def test_1_native_query_select(self):
18 query = "SELECT customer_first_name, customer_full_name FROM kibana_sample_data_ecommerce"
19 result = self.handler.native_query(query)
20 assert result.type is RESPONSE_TYPE.TABLE
22 def test_2_get_tables(self):
23 tables = self.handler.get_tables()
24 assert tables.type is not RESPONSE_TYPE.ERROR
26 def test_3_get_columns(self):
27 columns = self.handler.get_columns("kibana_sample_data_ecommerce")
28 assert columns.type is not RESPONSE_TYPE.ERROR
31if __name__ == "__main__":
32 unittest.main()