Coverage for mindsdb / integrations / libs / response.py: 76%
85 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import sys
2from typing import Callable
3from dataclasses import dataclass, fields
5import numpy
6import pandas
8from mindsdb.utilities import log
9from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
10from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE
11from mindsdb_sql_parser.ast import ASTNode
14logger = log.getLogger(__name__)
17@dataclass(frozen=True)
18class _INFORMATION_SCHEMA_COLUMNS_NAMES:
19 """Set of DataFrame columns that must be returned when calling `handler.get_columns(...)`.
20 These column names match the standard INFORMATION_SCHEMA.COLUMNS structure
21 used in SQL databases to describe table metadata.
22 """
24 COLUMN_NAME: str = "COLUMN_NAME"
25 DATA_TYPE: str = "DATA_TYPE"
26 ORDINAL_POSITION: str = "ORDINAL_POSITION"
27 COLUMN_DEFAULT: str = "COLUMN_DEFAULT"
28 IS_NULLABLE: str = "IS_NULLABLE"
29 CHARACTER_MAXIMUM_LENGTH: str = "CHARACTER_MAXIMUM_LENGTH"
30 CHARACTER_OCTET_LENGTH: str = "CHARACTER_OCTET_LENGTH"
31 NUMERIC_PRECISION: str = "NUMERIC_PRECISION"
32 NUMERIC_SCALE: str = "NUMERIC_SCALE"
33 DATETIME_PRECISION: str = "DATETIME_PRECISION"
34 CHARACTER_SET_NAME: str = "CHARACTER_SET_NAME"
35 COLLATION_NAME: str = "COLLATION_NAME"
36 MYSQL_DATA_TYPE: str = "MYSQL_DATA_TYPE"
39INF_SCHEMA_COLUMNS_NAMES = _INFORMATION_SCHEMA_COLUMNS_NAMES()
40INF_SCHEMA_COLUMNS_NAMES_SET = set(f.name for f in fields(INF_SCHEMA_COLUMNS_NAMES))
43class HandlerResponse:
44 def __init__(
45 self,
46 resp_type: RESPONSE_TYPE,
47 data_frame: pandas.DataFrame = None,
48 query: ASTNode = 0,
49 error_code: int = 0,
50 error_message: str | None = None,
51 affected_rows: int | None = None,
52 mysql_types: list[MYSQL_DATA_TYPE] | None = None,
53 is_expected_error: bool = False,
54 ) -> None:
55 self.resp_type = resp_type
56 self.query = query
57 self.data_frame = data_frame
58 self.error_code = error_code
59 self.error_message = error_message
60 self.affected_rows = affected_rows
61 if isinstance(self.affected_rows, int) is False or self.affected_rows < 0:
62 self.affected_rows = 0
63 self.mysql_types = mysql_types
64 self.is_expected_error = is_expected_error
65 self.exception = None
66 current_exception = sys.exc_info()
67 if current_exception[0] is not None:
68 self.exception = current_exception[1]
70 @property
71 def type(self):
72 return self.resp_type
74 def to_columns_table_response(self, map_type_fn: Callable) -> None:
75 """Transform the response to a `columns table` response.
76 NOTE: original dataframe will be mutated
77 """
78 if self.resp_type == RESPONSE_TYPE.COLUMNS_TABLE: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 return
80 if self.resp_type != RESPONSE_TYPE.TABLE:
81 if self.resp_type == RESPONSE_TYPE.ERROR: 81 ↛ 86line 81 didn't jump to line 86 because the condition on line 81 was always true
82 raise ValueError(
83 f"Cannot convert {self.resp_type} to {RESPONSE_TYPE.COLUMNS_TABLE}, "
84 f"the error is: {self.error_message}"
85 )
86 raise ValueError(f"Cannot convert {self.resp_type} to {RESPONSE_TYPE.COLUMNS_TABLE}")
88 self.data_frame.columns = [name.upper() for name in self.data_frame.columns]
89 self.data_frame[INF_SCHEMA_COLUMNS_NAMES.MYSQL_DATA_TYPE] = self.data_frame[
90 INF_SCHEMA_COLUMNS_NAMES.DATA_TYPE
91 ].apply(map_type_fn)
93 # region validate df
94 current_columns_set = set(self.data_frame.columns)
95 if INF_SCHEMA_COLUMNS_NAMES_SET != current_columns_set: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 raise ValueError(f"Columns set for INFORMATION_SCHEMA.COLUMNS is wrong: {list(current_columns_set)}")
97 # endregion
99 self.data_frame = self.data_frame.astype(
100 {
101 INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME: "string",
102 INF_SCHEMA_COLUMNS_NAMES.DATA_TYPE: "string",
103 INF_SCHEMA_COLUMNS_NAMES.ORDINAL_POSITION: "Int32",
104 INF_SCHEMA_COLUMNS_NAMES.COLUMN_DEFAULT: "string",
105 INF_SCHEMA_COLUMNS_NAMES.IS_NULLABLE: "string",
106 INF_SCHEMA_COLUMNS_NAMES.CHARACTER_MAXIMUM_LENGTH: "Int32",
107 INF_SCHEMA_COLUMNS_NAMES.CHARACTER_OCTET_LENGTH: "Int32",
108 INF_SCHEMA_COLUMNS_NAMES.NUMERIC_PRECISION: "Int32",
109 INF_SCHEMA_COLUMNS_NAMES.NUMERIC_SCALE: "Int32",
110 INF_SCHEMA_COLUMNS_NAMES.DATETIME_PRECISION: "Int32",
111 INF_SCHEMA_COLUMNS_NAMES.CHARACTER_SET_NAME: "string",
112 INF_SCHEMA_COLUMNS_NAMES.COLLATION_NAME: "string",
113 }
114 )
115 self.data_frame.replace([numpy.nan, pandas.NA], None, inplace=True)
117 self.resp_type = RESPONSE_TYPE.COLUMNS_TABLE
119 def to_json(self):
120 try:
121 data = None
122 if self.data_frame is not None:
123 data = self.data_frame.to_json(orient="split", index=False, date_format="iso")
124 except Exception as e:
125 logger.error("%s.to_json: error - %s", self.__class__.__name__, e)
126 data = None
127 return {
128 "type": self.resp_type,
129 "query": self.query,
130 "data_frame": data,
131 "error_code": self.error_code,
132 "error": self.error_message,
133 }
135 def __repr__(self):
136 return "%s: resp_type=%s, query=%s, data_frame=\n%s\nerr_code=%s, error=%s, affected_rows=%s" % (
137 self.__class__.__name__,
138 self.resp_type,
139 self.query,
140 self.data_frame,
141 self.error_code,
142 self.error_message,
143 self.affected_rows,
144 )
147class HandlerStatusResponse:
148 def __init__(
149 self,
150 success: bool = True,
151 error_message: str = None,
152 redirect_url: str = None,
153 copy_storage: str = None,
154 ) -> None:
155 self.success = success
156 self.error_message = error_message
157 self.redirect_url = redirect_url
158 self.copy_storage = copy_storage
160 def to_json(self):
161 data = {"success": self.success, "error": self.error_message}
162 if self.redirect_url is not None:
163 data["redirect_url"] = self.redirect_url
164 return data
166 def __repr__(self):
167 return f"{self.__class__.__name__}: success={self.success},\
168 error={self.error_message},\
169 redirect_url={self.redirect_url}"