Coverage for mindsdb / integrations / handlers / tdengine_handler / tdengine_handler.py: 0%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2import pandas as pd 

3import taosrest as td 

4from taosrest import sqlalchemy as SA 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb_sql_parser.ast.base import ASTNode 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.base import DatabaseHandler 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE 

16) 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class TDEngineHandler(DatabaseHandler): 

23 """ 

24 This handler handles connection and execution of the TDEngine statements. 

25 """ 

26 

27 name = 'tdengine' 

28 

29 def __init__(self, name, connection_data: Optional[dict], **kwargs): 

30 super().__init__(name) 

31 

32 self.parser = parse_sql 

33 self.dialect = 'tdengine' 

34 self.kwargs = kwargs 

35 self.connection_data = connection_data 

36 

37 self.connection = None 

38 self.is_connected = False 

39 

40 def connect(self): 

41 if self.is_connected is True: 

42 return self.connection 

43 

44 config = { 

45 'url': self.connection_data.get('url', "http://localhost:6041"), 

46 'token': self.connection_data.get('token'), 

47 'user': self.connection_data.get('user', 'root'), 

48 'password': self.connection_data.get('password', 'taosdata'), 

49 'database': self.connection_data.get('database') 

50 } 

51 

52 connection = td.connect(**config) 

53 self.is_connected = True 

54 self.connection = connection 

55 return self.connection 

56 

57 def disconnect(self): 

58 if self.is_connected is False: 

59 return 

60 self.connection.close() 

61 self.is_connected = False 

62 return 

63 

64 def check_connection(self) -> StatusResponse: 

65 

66 result = StatusResponse(False) 

67 need_to_close = self.is_connected is False 

68 

69 try: 

70 connection = self.connect() 

71 result.success = connection is not None 

72 except Exception as e: 

73 logger.error(f'Error connecting to TDEngine {self.connection_data["database"]}, {e}!') 

74 result.error_message = str(e) 

75 

76 if result.success is True and need_to_close: 

77 self.disconnect() 

78 if result.success is False and self.is_connected is True: 

79 self.is_connected = False 

80 

81 return result 

82 

83 def native_query(self, query: str) -> Response: 

84 """ 

85 Receive SQL query and runs it 

86 :param query: The SQL query to run in TDEngine 

87 :return: returns the records from the current recordset 

88 """ 

89 

90 need_to_close = self.is_connected is False 

91 

92 connection = self.connect() 

93 cur = connection.cursor() 

94 try: 

95 cur.execute(query) 

96 

97 if cur.rowcount != 0: 

98 result = cur.fetchall() 

99 response = Response( 

100 RESPONSE_TYPE.TABLE, 

101 pd.DataFrame( 

102 result, 

103 columns=[x[0] for x in cur.description] 

104 ) 

105 ) 

106 else: 

107 response = Response(RESPONSE_TYPE.OK) 

108 connection.commit() 

109 except Exception as e: 

110 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!') 

111 response = Response( 

112 RESPONSE_TYPE.ERROR, 

113 error_message=str(e) 

114 ) 

115 # connection.rollback() 

116 cur.close() 

117 if need_to_close is True: 

118 self.disconnect() 

119 

120 return response 

121 

122 def query(self, query: ASTNode) -> Response: 

123 """ 

124 Retrieve the data from the SQL statement. 

125 """ 

126 renderer = SqlalchemyRender(SA.TaosRestDialect) 

127 query_str = renderer.get_string(query, with_failback=True) 

128 return self.native_query(query_str) 

129 

130 def get_tables(self) -> Response: 

131 """ 

132 Get a list with all of the tabels in TDEngine 

133 """ 

134 q = 'SHOW TABLES;' 

135 

136 return self.native_query(q) 

137 

138 def get_columns(self, table_name) -> Response: 

139 """ 

140 Show details about the table 

141 """ 

142 q = f'DESCRIBE {table_name};' 

143 

144 return self.native_query(q)