Coverage for mindsdb / integrations / handlers / tdengine_handler / tdengine_handler.py: 0%
77 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
2import pandas as pd
3import taosrest as td
4from taosrest import sqlalchemy as SA
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb_sql_parser.ast.base import ASTNode
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.base import DatabaseHandler
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE
16)
19logger = log.getLogger(__name__)
22class TDEngineHandler(DatabaseHandler):
23 """
24 This handler handles connection and execution of the TDEngine statements.
25 """
27 name = 'tdengine'
29 def __init__(self, name, connection_data: Optional[dict], **kwargs):
30 super().__init__(name)
32 self.parser = parse_sql
33 self.dialect = 'tdengine'
34 self.kwargs = kwargs
35 self.connection_data = connection_data
37 self.connection = None
38 self.is_connected = False
40 def connect(self):
41 if self.is_connected is True:
42 return self.connection
44 config = {
45 'url': self.connection_data.get('url', "http://localhost:6041"),
46 'token': self.connection_data.get('token'),
47 'user': self.connection_data.get('user', 'root'),
48 'password': self.connection_data.get('password', 'taosdata'),
49 'database': self.connection_data.get('database')
50 }
52 connection = td.connect(**config)
53 self.is_connected = True
54 self.connection = connection
55 return self.connection
57 def disconnect(self):
58 if self.is_connected is False:
59 return
60 self.connection.close()
61 self.is_connected = False
62 return
64 def check_connection(self) -> StatusResponse:
66 result = StatusResponse(False)
67 need_to_close = self.is_connected is False
69 try:
70 connection = self.connect()
71 result.success = connection is not None
72 except Exception as e:
73 logger.error(f'Error connecting to TDEngine {self.connection_data["database"]}, {e}!')
74 result.error_message = str(e)
76 if result.success is True and need_to_close:
77 self.disconnect()
78 if result.success is False and self.is_connected is True:
79 self.is_connected = False
81 return result
83 def native_query(self, query: str) -> Response:
84 """
85 Receive SQL query and runs it
86 :param query: The SQL query to run in TDEngine
87 :return: returns the records from the current recordset
88 """
90 need_to_close = self.is_connected is False
92 connection = self.connect()
93 cur = connection.cursor()
94 try:
95 cur.execute(query)
97 if cur.rowcount != 0:
98 result = cur.fetchall()
99 response = Response(
100 RESPONSE_TYPE.TABLE,
101 pd.DataFrame(
102 result,
103 columns=[x[0] for x in cur.description]
104 )
105 )
106 else:
107 response = Response(RESPONSE_TYPE.OK)
108 connection.commit()
109 except Exception as e:
110 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!')
111 response = Response(
112 RESPONSE_TYPE.ERROR,
113 error_message=str(e)
114 )
115 # connection.rollback()
116 cur.close()
117 if need_to_close is True:
118 self.disconnect()
120 return response
122 def query(self, query: ASTNode) -> Response:
123 """
124 Retrieve the data from the SQL statement.
125 """
126 renderer = SqlalchemyRender(SA.TaosRestDialect)
127 query_str = renderer.get_string(query, with_failback=True)
128 return self.native_query(query_str)
130 def get_tables(self) -> Response:
131 """
132 Get a list with all of the tabels in TDEngine
133 """
134 q = 'SHOW TABLES;'
136 return self.native_query(q)
138 def get_columns(self, table_name) -> Response:
139 """
140 Show details about the table
141 """
142 q = f'DESCRIBE {table_name};'
144 return self.native_query(q)