Coverage for mindsdb / api / mysql / mysql_proxy / utilities / dump.py: 48%
248 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import struct
2import datetime
3from typing import Any
4from array import array
6import orjson
7import numpy as np
8from numpy import dtype as np_dtype
9import pandas as pd
10from pandas.api import types as pd_types
12from mindsdb.api.executor.sql_query.result_set import ResultSet, get_mysql_data_type_from_series, Column
13from mindsdb.api.mysql.mysql_proxy.utilities.lightwood_dtype import dtype as lightwood_dtype
14from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import (
15 MYSQL_DATA_TYPE,
16 DATA_C_TYPE_MAP,
17 CTypeProperties,
18 CHARSET_NUMBERS,
19 NULL_VALUE,
20)
21from mindsdb.utilities import log
22from mindsdb.utilities.json_encoder import CustomJSONEncoder
23from mindsdb.api.mysql.mysql_proxy.data_types.mysql_datum import Datum
25logger = log.getLogger(__name__)
27# Pre-bind default encoder for custom types so we can serialize JSON consistently
28_default_json = CustomJSONEncoder().default
31def column_to_mysql_column_dict(column: Column, database_name: str | None = None) -> dict[str, str | int]:
32 """Convert Column object to dict with column properties.
34 Args:
35 column (Column): Column object to convert.
36 database_name (str | None): Name of the database.
38 Returns:
39 dict[str, str | int]: Dictionary with mysql column properties.
40 """
41 # region infer type. Should not happen, but what if it is dtype of lightwood type?
42 if isinstance(column.type, str): 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true
43 try:
44 column.type = MYSQL_DATA_TYPE(column.type)
45 except ValueError:
46 if column.type == lightwood_dtype.date:
47 column.type = MYSQL_DATA_TYPE.DATE
48 elif column.type == lightwood_dtype.datetime:
49 column.type = MYSQL_DATA_TYPE.DATETIME
50 elif column.type == lightwood_dtype.float:
51 column.type = MYSQL_DATA_TYPE.FLOAT
52 elif column.type == lightwood_dtype.integer:
53 column.type = MYSQL_DATA_TYPE.INT
54 else:
55 column.type = MYSQL_DATA_TYPE.TEXT
56 elif isinstance(column.type, np_dtype): 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 if pd_types.is_integer_dtype(column.type):
58 column.type = MYSQL_DATA_TYPE.INT
59 elif pd_types.is_numeric_dtype(column.type):
60 column.type = MYSQL_DATA_TYPE.FLOAT
61 elif pd_types.is_datetime64_any_dtype(column.type):
62 column.type = MYSQL_DATA_TYPE.DATETIME
63 else:
64 column.type = MYSQL_DATA_TYPE.TEXT
65 # endregion
67 if isinstance(column.type, MYSQL_DATA_TYPE) is False: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 logger.warning(f"Unexpected column type: {column.type}. Use TEXT as fallback.")
69 column.type = MYSQL_DATA_TYPE.TEXT
71 charset = CHARSET_NUMBERS["utf8_unicode_ci"]
72 if column.type in (MYSQL_DATA_TYPE.JSON, MYSQL_DATA_TYPE.VECTOR): 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 charset = CHARSET_NUMBERS["binary"]
75 type_properties: CTypeProperties = DATA_C_TYPE_MAP[column.type]
77 result = {
78 "database": column.database or database_name,
79 # TODO add 'original_table'
80 "table_name": column.table_name,
81 "name": column.name,
82 "alias": column.alias or column.name,
83 "size": type_properties.size,
84 "flags": type_properties.flags,
85 "type": type_properties.code,
86 "type_enum": column.type,
87 "charset": charset,
88 }
89 return result
92def _dump_bool(var: Any) -> int | None:
93 """Dumps a boolean value to an integer, as in MySQL boolean type is tinyint with values 0 and 1.
94 NOTE: None consider as True in dataframe with dtype=bool, we can't change it
96 Args:
97 var (Any): The boolean value to dump
99 Returns:
100 int | None: 1 or 0 or None
101 """
102 if pd.isna(var):
103 return None
104 return "1" if var else "0"
107def _dump_str(var: Any) -> str | None:
108 """Dumps a value to a string.
110 Args:
111 var (Any): The value to dump
113 Returns:
114 str | None: The string representation of the value or None if the value is None
115 """
116 if isinstance(var, bytes): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 try:
118 return var.decode("utf-8")
119 except Exception:
120 return str(var)[2:-1]
121 if isinstance(var, (dict, list)): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 try:
123 return orjson.dumps(
124 var,
125 default=_default_json,
126 option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_PASSTHROUGH_DATETIME,
127 ).decode("utf-8")
128 except Exception:
129 return str(var)
130 # pd.isna returns array of bools for list
131 # and the truth value of a numpy array is ambiguous
132 if isinstance(var, (list, np.ndarray)) is False and pd.isna(var):
133 return None
134 return str(var)
137def _dump_int_or_str(var: Any) -> str | None:
138 """Dumps a value to a string.
139 If the value is numeric - then cast it to int to avoid float representation.
141 Args:
142 var (Any): The value to dump.
144 Returns:
145 str | None: The string representation of the value or None if the value is None
146 """
147 if pd.isna(var):
148 return None
149 try:
150 return str(int(var))
151 except ValueError:
152 return str(var)
155def _dump_date(var: datetime.date | str | None) -> str | None:
156 """Dumps a date value to a string.
158 Args:
159 var (datetime.date | str | None): The date value to dump
161 Returns:
162 str | None: The string representation of the date value or None if the value is None
163 """
164 if isinstance(var, (datetime.date, pd.Timestamp)): # it is also True for datetime.datetime
165 return var.strftime("%Y-%m-%d")
166 elif isinstance(var, str): 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 return var
168 elif pd.isna(var): 168 ↛ 170line 168 didn't jump to line 170 because the condition on line 168 was always true
169 return None
170 logger.warning(f"Unexpected value type for DATE: {type(var)}, {var}")
171 return _dump_str(var)
174def _dump_datetime(var: datetime.datetime | str | None) -> str | None:
175 """Dumps a datetime value to a string.
176 # NOTE mysql may display only %Y-%m-%d %H:%M:%S format for datetime column
178 Args:
179 var (datetime.datetime | str | None): The datetime value to dump
181 Returns:
182 str | None: The string representation of the datetime value or None if the value is None
183 """
184 if isinstance(var, datetime.date): # it is also datetime.datetime
185 if hasattr(var, "tzinfo") and var.tzinfo is not None:
186 return var.astimezone(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
187 return var.strftime("%Y-%m-%d %H:%M:%S")
188 elif isinstance(var, pd.Timestamp): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 if var.tzinfo is not None:
190 return var.tz_convert("UTC").strftime("%Y-%m-%d %H:%M:%S")
191 return var.strftime("%Y-%m-%d %H:%M:%S")
192 elif isinstance(var, str): 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 return var
194 elif pd.isna(var): 194 ↛ 196line 194 didn't jump to line 196 because the condition on line 194 was always true
195 return None
196 logger.warning(f"Unexpected value type for DATETIME: {type(var)}, {var}")
197 return _dump_str(var)
200def _dump_time(var: datetime.time | str | None) -> str | None:
201 """Dumps a time value to a string.
203 Args:
204 var (datetime.time | str | None): The time value to dump
206 Returns:
207 str | None: The string representation of the time value or None if the value is None
208 """
209 if isinstance(var, datetime.time):
210 if var.tzinfo is not None:
211 # NOTE strftime does not support timezone, so we need to convert to UTC
212 offset_seconds = var.tzinfo.utcoffset(None).total_seconds()
213 time_seconds = var.hour * 3600 + var.minute * 60 + var.second
214 utc_seconds = (time_seconds - offset_seconds) % (24 * 3600)
215 hours = int(utc_seconds // 3600)
216 minutes = int((utc_seconds % 3600) // 60)
217 seconds = int(utc_seconds % 60)
218 var = datetime.time(hours, minutes, seconds, var.microsecond)
219 return var.strftime("%H:%M:%S")
220 elif isinstance(var, datetime.datetime):
221 if var.tzinfo is not None:
222 return var.astimezone(datetime.timezone.utc).strftime("%H:%M:%S")
223 return var.strftime("%H:%M:%S")
224 elif isinstance(var, pd.Timestamp): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 if var.tzinfo is not None:
226 return var.tz_convert("UTC").strftime("%H:%M:%S")
227 return var.strftime("%H:%M:%S")
228 elif isinstance(var, str): 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 return var
230 elif pd.isna(var): 230 ↛ 232line 230 didn't jump to line 232 because the condition on line 230 was always true
231 return None
232 logger.warning(f"Unexpected value type for TIME: {type(var)}, {var}")
233 return _dump_str(var)
236def _dump_vector(value: Any) -> bytes | None:
237 """Convert array or list of floats to a bytes.
239 Args:
240 value (Any): The value to dump
242 Returns:
243 bytes | None: The bytes representation of the vector value or None if the value is None
244 """
245 if isinstance(value, (array, list, np.ndarray)):
246 return b"".join([struct.pack("<f", el) for el in value])
247 elif pd.isna(value):
248 return None
249 err_msg = f"Unexpected value type for VECTOR: {type(value)}, {value}"
250 logger.error(err_msg)
251 raise ValueError(err_msg)
254def _handle_series_as_date(series: pd.Series) -> pd.Series:
255 """Convert values in a series to a string representation of a date.
256 NOTE: MySQL require exactly %Y-%m-%d for DATE type.
258 Args:
259 series (pd.Series): The series to handle
261 Returns:
262 pd.Series: The series with the date values as strings
263 """
264 if pd_types.is_datetime64_any_dtype(series.dtype):
265 return series.dt.strftime("%Y-%m-%d")
266 elif pd_types.is_object_dtype(series.dtype): 266 ↛ 268line 266 didn't jump to line 268 because the condition on line 266 was always true
267 return series.apply(_dump_date)
268 logger.info(f"Unexpected dtype: {series.dtype} for column with type DATE")
269 return series.apply(_dump_str)
272def _handle_series_as_datetime(series: pd.Series) -> pd.Series:
273 """Convert values in a series to a string representation of a datetime.
274 NOTE: MySQL's DATETIME type require exactly %Y-%m-%d %H:%M:%S format.
276 Args:
277 series (pd.Series): The series to handle
279 Returns:
280 pd.Series: The series with the datetime values as strings
281 """
282 if pd_types.is_datetime64_any_dtype(series.dtype):
283 return series.dt.strftime("%Y-%m-%d %H:%M:%S")
284 elif pd_types.is_object_dtype(series.dtype): 284 ↛ 286line 284 didn't jump to line 286 because the condition on line 284 was always true
285 return series.apply(_dump_datetime)
286 logger.info(f"Unexpected dtype: {series.dtype} for column with type DATETIME")
287 return series.apply(_dump_str)
290def _handle_series_as_time(series: pd.Series) -> pd.Series:
291 """Convert values in a series to a string representation of a time.
292 NOTE: MySQL's TIME type require exactly %H:%M:%S format.
294 Args:
295 series (pd.Series): The series to handle
297 Returns:
298 pd.Series: The series with the time values as strings
299 """
300 if pd_types.is_timedelta64_ns_dtype(series.dtype): 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 base_time = pd.Timestamp("2000-01-01")
302 series = (base_time + series).dt.strftime("%H:%M:%S")
303 elif pd_types.is_datetime64_dtype(series.dtype):
304 series = series.dt.strftime("%H:%M:%S")
305 elif pd_types.is_object_dtype(series.dtype): 305 ↛ 308line 305 didn't jump to line 308 because the condition on line 305 was always true
306 series = series.apply(_dump_time)
307 else:
308 logger.info(f"Unexpected dtype: {series.dtype} for column with type TIME")
309 series = series.apply(_dump_str)
310 return series
313def _handle_series_as_int(series: pd.Series) -> pd.Series:
314 """Dump series to str(int) (or just str, of can't case to int). This need because of DataFrame store imput int as
315 float if dtype is object: pd.DataFrame([None, 1], dtype='object') -> [NaN, 1.0]
317 Args:
318 series (pd.Series): The series to handle
320 Returns:
321 pd.Series: The series with the int values as strings
322 """
323 if pd_types.is_integer_dtype(series.dtype):
324 if series.dtype == "Int64":
325 # NOTE: 'apply' converts values to python floats
326 return series.astype(object).apply(_dump_str)
327 return series.apply(_dump_str)
328 return series.apply(_dump_int_or_str)
331def _handle_series_as_vector(series: pd.Series) -> pd.Series:
332 """Convert values in a series to a bytes representation of a vector.
333 NOTE: MySQL's VECTOR type require exactly 4 bytes per float.
335 Args:
336 series (pd.Series): The series to handle
338 Returns:
339 pd.Series: The series with the vector values as bytes
340 """
341 return series.apply(_dump_vector)
344def dump_result_set_to_mysql(
345 result_set: ResultSet, infer_column_size: bool = False
346) -> tuple[pd.DataFrame, list[dict[str, str | int]]]:
347 """
348 Dumps the ResultSet to a format that can be used to send as MySQL response packet.
349 NOTE: This method modifies the original DataFrame and columns.
351 Args:
352 result_set (ResultSet): result set to dump
353 infer_column_size (bool): If True, infer the 'size' attribute of the column from the data.
354 Exact size is not necessary, approximate is enough.
356 Returns:
357 tuple[pd.DataFrame, list[dict[str, str | int]]]: A tuple containing the modified DataFrame and a list
358 of MySQL column dictionaries. The dataframe values are
359 str or None, dtype=object
360 """
361 df = result_set.get_raw_df()
363 for i, column in enumerate(result_set.columns):
364 series = df[i]
365 if isinstance(column.type, MYSQL_DATA_TYPE) is False:
366 column.type = get_mysql_data_type_from_series(series)
368 column_type: MYSQL_DATA_TYPE = column.type
370 match column_type:
371 case MYSQL_DATA_TYPE.BOOL | MYSQL_DATA_TYPE.BOOLEAN:
372 series = series.apply(_dump_bool)
373 case MYSQL_DATA_TYPE.DATE:
374 series = _handle_series_as_date(series)
375 case MYSQL_DATA_TYPE.DATETIME:
376 series = _handle_series_as_datetime(series)
377 case MYSQL_DATA_TYPE.TIME:
378 series = _handle_series_as_time(series)
379 case (
380 MYSQL_DATA_TYPE.INT
381 | MYSQL_DATA_TYPE.TINYINT
382 | MYSQL_DATA_TYPE.SMALLINT
383 | MYSQL_DATA_TYPE.MEDIUMINT
384 | MYSQL_DATA_TYPE.BIGINT
385 | MYSQL_DATA_TYPE.YEAR
386 ):
387 series = _handle_series_as_int(series)
388 case MYSQL_DATA_TYPE.VECTOR: 388 ↛ 389line 388 didn't jump to line 389 because the pattern on line 388 never matched
389 series = _handle_series_as_vector(series)
390 case _:
391 series = series.apply(_dump_str)
393 # inplace modification of dt types raise SettingWithCopyWarning, so do regular replace
394 # we may split this operation for dt and other types for optimisation
395 df[i] = series.replace([np.nan, pd.NA, pd.NaT], None)
397 columns_dicts = [column_to_mysql_column_dict(column) for column in result_set.columns]
399 if infer_column_size and any(column_info.get("size") is None for column_info in columns_dicts): 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true
400 if len(df) == 0:
401 for column_info in columns_dicts:
402 if column_info["size"] is None:
403 column_info["size"] = 1
404 else:
405 sample = df.head(100)
406 for i, column_info in enumerate(columns_dicts):
407 try:
408 column_info["size"] = sample[sample.columns[i]].astype(str).str.len().max()
409 except Exception:
410 column_info["size"] = 1
412 return df, columns_dicts
415def dump_columns_info(result_set: ResultSet, infer_column_size: bool = False) -> list[dict[str, str | int]]:
416 """Preare list of columns attrs that are required for dump to mysql protocol
418 Args:
419 result_set (ResultSet): result set
420 infer_column_size (bool): If True, infer the 'size' attribute of the column from the data.
421 Exact size is not necessary, approximate is enough.
423 Returns:
424 list[dict[str, str | int]]: list of MySQL column dictionaries.
425 """
426 df = result_set.get_raw_df()
428 for i, column in enumerate(result_set.columns):
429 series = df[i]
430 if isinstance(column.type, MYSQL_DATA_TYPE) is False:
431 column.type = get_mysql_data_type_from_series(series)
433 columns_dicts = [column_to_mysql_column_dict(column) for column in result_set.columns]
435 if infer_column_size and any(column_info.get("size") is None for column_info in columns_dicts):
436 if len(df) == 0:
437 for column_info in columns_dicts:
438 if column_info["size"] is None:
439 column_info["size"] = 1
440 else:
441 sample = df.head(100)
442 for i, column_info in enumerate(columns_dicts):
443 try:
444 column_info["size"] = sample[sample.columns[i]].astype(str).str.len().max()
445 except Exception:
446 column_info["size"] = 1
448 return columns_dicts
451def serialize_bytes(data: bytes) -> bytes:
452 """serialize bytes to mysql protocol
454 Args:
455 data (bytes): the bytes to serialize
457 Returns:
458 bytes: the serialized bytes
459 """
460 if data == NULL_VALUE:
461 return data
462 else:
463 return Datum.serialize_bytes(data)
466def dump_chunks(df: pd.DataFrame, columns_info: list[dict], chunk_size: int):
467 """Serialize dataframe values to mysql TEXT protocol
469 Args:
470 df (pd.DataFrame): the dataframe to serialize
471 columns_info (list[dict]): the columns info
472 chunk_size (int): the chunk size
474 Yields:
475 list[bytes]: the serialized dataframe values
476 """
477 start = 0
478 while start < len(df):
479 serieces = []
480 for i, column in enumerate(columns_info):
481 series = df[i][start : start + chunk_size]
482 match column["type_enum"]:
483 case MYSQL_DATA_TYPE.BOOL | MYSQL_DATA_TYPE.BOOLEAN:
484 series = series.apply(_dump_bool)
485 case MYSQL_DATA_TYPE.DATE:
486 series = _handle_series_as_date(series)
487 case MYSQL_DATA_TYPE.DATETIME:
488 series = _handle_series_as_datetime(series)
489 case MYSQL_DATA_TYPE.TIME:
490 series = _handle_series_as_time(series)
491 case MYSQL_DATA_TYPE.VECTOR:
492 series = _handle_series_as_vector(series)
493 case (
494 MYSQL_DATA_TYPE.INT
495 | MYSQL_DATA_TYPE.TINYINT
496 | MYSQL_DATA_TYPE.SMALLINT
497 | MYSQL_DATA_TYPE.MEDIUMINT
498 | MYSQL_DATA_TYPE.BIGINT
499 | MYSQL_DATA_TYPE.YEAR
500 | MYSQL_DATA_TYPE.FLOAT
501 | MYSQL_DATA_TYPE.DOUBLE
502 | MYSQL_DATA_TYPE.DECIMAL
503 | MYSQL_DATA_TYPE.TIMESTAMP
504 ):
505 pass
506 case MYSQL_DATA_TYPE.TEXT:
507 # NOTE: it would be good do nothing for TEXT column as for INT.
508 # However, while we use TEXT as a fallback for undetected types, we need to handle it carefully.
509 series = series.apply(_dump_str)
510 case _:
511 series = series.apply(_dump_str)
512 serieces.append(series.astype(bytes).mask(series.isnull(), NULL_VALUE).apply(serialize_bytes))
514 yield pd.concat(serieces, axis=1).sum(axis=1).tolist()
516 start += chunk_size