Coverage for mindsdb / integrations / libs / response.py: 76%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import sys 

2from typing import Callable 

3from dataclasses import dataclass, fields 

4 

5import numpy 

6import pandas 

7 

8from mindsdb.utilities import log 

9from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE 

10from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE 

11from mindsdb_sql_parser.ast import ASTNode 

12 

13 

14logger = log.getLogger(__name__) 

15 

16 

17@dataclass(frozen=True) 

18class _INFORMATION_SCHEMA_COLUMNS_NAMES: 

19 """Set of DataFrame columns that must be returned when calling `handler.get_columns(...)`. 

20 These column names match the standard INFORMATION_SCHEMA.COLUMNS structure 

21 used in SQL databases to describe table metadata. 

22 """ 

23 

24 COLUMN_NAME: str = "COLUMN_NAME" 

25 DATA_TYPE: str = "DATA_TYPE" 

26 ORDINAL_POSITION: str = "ORDINAL_POSITION" 

27 COLUMN_DEFAULT: str = "COLUMN_DEFAULT" 

28 IS_NULLABLE: str = "IS_NULLABLE" 

29 CHARACTER_MAXIMUM_LENGTH: str = "CHARACTER_MAXIMUM_LENGTH" 

30 CHARACTER_OCTET_LENGTH: str = "CHARACTER_OCTET_LENGTH" 

31 NUMERIC_PRECISION: str = "NUMERIC_PRECISION" 

32 NUMERIC_SCALE: str = "NUMERIC_SCALE" 

33 DATETIME_PRECISION: str = "DATETIME_PRECISION" 

34 CHARACTER_SET_NAME: str = "CHARACTER_SET_NAME" 

35 COLLATION_NAME: str = "COLLATION_NAME" 

36 MYSQL_DATA_TYPE: str = "MYSQL_DATA_TYPE" 

37 

38 

39INF_SCHEMA_COLUMNS_NAMES = _INFORMATION_SCHEMA_COLUMNS_NAMES() 

40INF_SCHEMA_COLUMNS_NAMES_SET = set(f.name for f in fields(INF_SCHEMA_COLUMNS_NAMES)) 

41 

42 

43class HandlerResponse: 

44 def __init__( 

45 self, 

46 resp_type: RESPONSE_TYPE, 

47 data_frame: pandas.DataFrame = None, 

48 query: ASTNode = 0, 

49 error_code: int = 0, 

50 error_message: str | None = None, 

51 affected_rows: int | None = None, 

52 mysql_types: list[MYSQL_DATA_TYPE] | None = None, 

53 is_expected_error: bool = False, 

54 ) -> None: 

55 self.resp_type = resp_type 

56 self.query = query 

57 self.data_frame = data_frame 

58 self.error_code = error_code 

59 self.error_message = error_message 

60 self.affected_rows = affected_rows 

61 if isinstance(self.affected_rows, int) is False or self.affected_rows < 0: 

62 self.affected_rows = 0 

63 self.mysql_types = mysql_types 

64 self.is_expected_error = is_expected_error 

65 self.exception = None 

66 current_exception = sys.exc_info() 

67 if current_exception[0] is not None: 

68 self.exception = current_exception[1] 

69 

70 @property 

71 def type(self): 

72 return self.resp_type 

73 

74 def to_columns_table_response(self, map_type_fn: Callable) -> None: 

75 """Transform the response to a `columns table` response. 

76 NOTE: original dataframe will be mutated 

77 """ 

78 if self.resp_type == RESPONSE_TYPE.COLUMNS_TABLE: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 return 

80 if self.resp_type != RESPONSE_TYPE.TABLE: 

81 if self.resp_type == RESPONSE_TYPE.ERROR: 81 ↛ 86line 81 didn't jump to line 86 because the condition on line 81 was always true

82 raise ValueError( 

83 f"Cannot convert {self.resp_type} to {RESPONSE_TYPE.COLUMNS_TABLE}, " 

84 f"the error is: {self.error_message}" 

85 ) 

86 raise ValueError(f"Cannot convert {self.resp_type} to {RESPONSE_TYPE.COLUMNS_TABLE}") 

87 

88 self.data_frame.columns = [name.upper() for name in self.data_frame.columns] 

89 self.data_frame[INF_SCHEMA_COLUMNS_NAMES.MYSQL_DATA_TYPE] = self.data_frame[ 

90 INF_SCHEMA_COLUMNS_NAMES.DATA_TYPE 

91 ].apply(map_type_fn) 

92 

93 # region validate df 

94 current_columns_set = set(self.data_frame.columns) 

95 if INF_SCHEMA_COLUMNS_NAMES_SET != current_columns_set: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 raise ValueError(f"Columns set for INFORMATION_SCHEMA.COLUMNS is wrong: {list(current_columns_set)}") 

97 # endregion 

98 

99 self.data_frame = self.data_frame.astype( 

100 { 

101 INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME: "string", 

102 INF_SCHEMA_COLUMNS_NAMES.DATA_TYPE: "string", 

103 INF_SCHEMA_COLUMNS_NAMES.ORDINAL_POSITION: "Int32", 

104 INF_SCHEMA_COLUMNS_NAMES.COLUMN_DEFAULT: "string", 

105 INF_SCHEMA_COLUMNS_NAMES.IS_NULLABLE: "string", 

106 INF_SCHEMA_COLUMNS_NAMES.CHARACTER_MAXIMUM_LENGTH: "Int32", 

107 INF_SCHEMA_COLUMNS_NAMES.CHARACTER_OCTET_LENGTH: "Int32", 

108 INF_SCHEMA_COLUMNS_NAMES.NUMERIC_PRECISION: "Int32", 

109 INF_SCHEMA_COLUMNS_NAMES.NUMERIC_SCALE: "Int32", 

110 INF_SCHEMA_COLUMNS_NAMES.DATETIME_PRECISION: "Int32", 

111 INF_SCHEMA_COLUMNS_NAMES.CHARACTER_SET_NAME: "string", 

112 INF_SCHEMA_COLUMNS_NAMES.COLLATION_NAME: "string", 

113 } 

114 ) 

115 self.data_frame.replace([numpy.nan, pandas.NA], None, inplace=True) 

116 

117 self.resp_type = RESPONSE_TYPE.COLUMNS_TABLE 

118 

119 def to_json(self): 

120 try: 

121 data = None 

122 if self.data_frame is not None: 

123 data = self.data_frame.to_json(orient="split", index=False, date_format="iso") 

124 except Exception as e: 

125 logger.error("%s.to_json: error - %s", self.__class__.__name__, e) 

126 data = None 

127 return { 

128 "type": self.resp_type, 

129 "query": self.query, 

130 "data_frame": data, 

131 "error_code": self.error_code, 

132 "error": self.error_message, 

133 } 

134 

135 def __repr__(self): 

136 return "%s: resp_type=%s, query=%s, data_frame=\n%s\nerr_code=%s, error=%s, affected_rows=%s" % ( 

137 self.__class__.__name__, 

138 self.resp_type, 

139 self.query, 

140 self.data_frame, 

141 self.error_code, 

142 self.error_message, 

143 self.affected_rows, 

144 ) 

145 

146 

147class HandlerStatusResponse: 

148 def __init__( 

149 self, 

150 success: bool = True, 

151 error_message: str = None, 

152 redirect_url: str = None, 

153 copy_storage: str = None, 

154 ) -> None: 

155 self.success = success 

156 self.error_message = error_message 

157 self.redirect_url = redirect_url 

158 self.copy_storage = copy_storage 

159 

160 def to_json(self): 

161 data = {"success": self.success, "error": self.error_message} 

162 if self.redirect_url is not None: 

163 data["redirect_url"] = self.redirect_url 

164 return data 

165 

166 def __repr__(self): 

167 return f"{self.__class__.__name__}: success={self.success},\ 

168 error={self.error_message},\ 

169 redirect_url={self.redirect_url}"