Coverage for mindsdb / api / mysql / mysql_proxy / utilities / dump.py: 48%

248 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import struct 

2import datetime 

3from typing import Any 

4from array import array 

5 

6import orjson 

7import numpy as np 

8from numpy import dtype as np_dtype 

9import pandas as pd 

10from pandas.api import types as pd_types 

11 

12from mindsdb.api.executor.sql_query.result_set import ResultSet, get_mysql_data_type_from_series, Column 

13from mindsdb.api.mysql.mysql_proxy.utilities.lightwood_dtype import dtype as lightwood_dtype 

14from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import ( 

15 MYSQL_DATA_TYPE, 

16 DATA_C_TYPE_MAP, 

17 CTypeProperties, 

18 CHARSET_NUMBERS, 

19 NULL_VALUE, 

20) 

21from mindsdb.utilities import log 

22from mindsdb.utilities.json_encoder import CustomJSONEncoder 

23from mindsdb.api.mysql.mysql_proxy.data_types.mysql_datum import Datum 

24 

25logger = log.getLogger(__name__) 

26 

27# Pre-bind default encoder for custom types so we can serialize JSON consistently 

28_default_json = CustomJSONEncoder().default 

29 

30 

31def column_to_mysql_column_dict(column: Column, database_name: str | None = None) -> dict[str, str | int]: 

32 """Convert Column object to dict with column properties. 

33 

34 Args: 

35 column (Column): Column object to convert. 

36 database_name (str | None): Name of the database. 

37 

38 Returns: 

39 dict[str, str | int]: Dictionary with mysql column properties. 

40 """ 

41 # region infer type. Should not happen, but what if it is dtype of lightwood type? 

42 if isinstance(column.type, str): 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true

43 try: 

44 column.type = MYSQL_DATA_TYPE(column.type) 

45 except ValueError: 

46 if column.type == lightwood_dtype.date: 

47 column.type = MYSQL_DATA_TYPE.DATE 

48 elif column.type == lightwood_dtype.datetime: 

49 column.type = MYSQL_DATA_TYPE.DATETIME 

50 elif column.type == lightwood_dtype.float: 

51 column.type = MYSQL_DATA_TYPE.FLOAT 

52 elif column.type == lightwood_dtype.integer: 

53 column.type = MYSQL_DATA_TYPE.INT 

54 else: 

55 column.type = MYSQL_DATA_TYPE.TEXT 

56 elif isinstance(column.type, np_dtype): 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 if pd_types.is_integer_dtype(column.type): 

58 column.type = MYSQL_DATA_TYPE.INT 

59 elif pd_types.is_numeric_dtype(column.type): 

60 column.type = MYSQL_DATA_TYPE.FLOAT 

61 elif pd_types.is_datetime64_any_dtype(column.type): 

62 column.type = MYSQL_DATA_TYPE.DATETIME 

63 else: 

64 column.type = MYSQL_DATA_TYPE.TEXT 

65 # endregion 

66 

67 if isinstance(column.type, MYSQL_DATA_TYPE) is False: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 logger.warning(f"Unexpected column type: {column.type}. Use TEXT as fallback.") 

69 column.type = MYSQL_DATA_TYPE.TEXT 

70 

71 charset = CHARSET_NUMBERS["utf8_unicode_ci"] 

72 if column.type in (MYSQL_DATA_TYPE.JSON, MYSQL_DATA_TYPE.VECTOR): 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 charset = CHARSET_NUMBERS["binary"] 

74 

75 type_properties: CTypeProperties = DATA_C_TYPE_MAP[column.type] 

76 

77 result = { 

78 "database": column.database or database_name, 

79 # TODO add 'original_table' 

80 "table_name": column.table_name, 

81 "name": column.name, 

82 "alias": column.alias or column.name, 

83 "size": type_properties.size, 

84 "flags": type_properties.flags, 

85 "type": type_properties.code, 

86 "type_enum": column.type, 

87 "charset": charset, 

88 } 

89 return result 

90 

91 

92def _dump_bool(var: Any) -> int | None: 

93 """Dumps a boolean value to an integer, as in MySQL boolean type is tinyint with values 0 and 1. 

94 NOTE: None consider as True in dataframe with dtype=bool, we can't change it 

95 

96 Args: 

97 var (Any): The boolean value to dump 

98 

99 Returns: 

100 int | None: 1 or 0 or None 

101 """ 

102 if pd.isna(var): 

103 return None 

104 return "1" if var else "0" 

105 

106 

107def _dump_str(var: Any) -> str | None: 

108 """Dumps a value to a string. 

109 

110 Args: 

111 var (Any): The value to dump 

112 

113 Returns: 

114 str | None: The string representation of the value or None if the value is None 

115 """ 

116 if isinstance(var, bytes): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 try: 

118 return var.decode("utf-8") 

119 except Exception: 

120 return str(var)[2:-1] 

121 if isinstance(var, (dict, list)): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 try: 

123 return orjson.dumps( 

124 var, 

125 default=_default_json, 

126 option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_PASSTHROUGH_DATETIME, 

127 ).decode("utf-8") 

128 except Exception: 

129 return str(var) 

130 # pd.isna returns array of bools for list 

131 # and the truth value of a numpy array is ambiguous 

132 if isinstance(var, (list, np.ndarray)) is False and pd.isna(var): 

133 return None 

134 return str(var) 

135 

136 

137def _dump_int_or_str(var: Any) -> str | None: 

138 """Dumps a value to a string. 

139 If the value is numeric - then cast it to int to avoid float representation. 

140 

141 Args: 

142 var (Any): The value to dump. 

143 

144 Returns: 

145 str | None: The string representation of the value or None if the value is None 

146 """ 

147 if pd.isna(var): 

148 return None 

149 try: 

150 return str(int(var)) 

151 except ValueError: 

152 return str(var) 

153 

154 

155def _dump_date(var: datetime.date | str | None) -> str | None: 

156 """Dumps a date value to a string. 

157 

158 Args: 

159 var (datetime.date | str | None): The date value to dump 

160 

161 Returns: 

162 str | None: The string representation of the date value or None if the value is None 

163 """ 

164 if isinstance(var, (datetime.date, pd.Timestamp)): # it is also True for datetime.datetime 

165 return var.strftime("%Y-%m-%d") 

166 elif isinstance(var, str): 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 return var 

168 elif pd.isna(var): 168 ↛ 170line 168 didn't jump to line 170 because the condition on line 168 was always true

169 return None 

170 logger.warning(f"Unexpected value type for DATE: {type(var)}, {var}") 

171 return _dump_str(var) 

172 

173 

174def _dump_datetime(var: datetime.datetime | str | None) -> str | None: 

175 """Dumps a datetime value to a string. 

176 # NOTE mysql may display only %Y-%m-%d %H:%M:%S format for datetime column 

177 

178 Args: 

179 var (datetime.datetime | str | None): The datetime value to dump 

180 

181 Returns: 

182 str | None: The string representation of the datetime value or None if the value is None 

183 """ 

184 if isinstance(var, datetime.date): # it is also datetime.datetime 

185 if hasattr(var, "tzinfo") and var.tzinfo is not None: 

186 return var.astimezone(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S") 

187 return var.strftime("%Y-%m-%d %H:%M:%S") 

188 elif isinstance(var, pd.Timestamp): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 if var.tzinfo is not None: 

190 return var.tz_convert("UTC").strftime("%Y-%m-%d %H:%M:%S") 

191 return var.strftime("%Y-%m-%d %H:%M:%S") 

192 elif isinstance(var, str): 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 return var 

194 elif pd.isna(var): 194 ↛ 196line 194 didn't jump to line 196 because the condition on line 194 was always true

195 return None 

196 logger.warning(f"Unexpected value type for DATETIME: {type(var)}, {var}") 

197 return _dump_str(var) 

198 

199 

200def _dump_time(var: datetime.time | str | None) -> str | None: 

201 """Dumps a time value to a string. 

202 

203 Args: 

204 var (datetime.time | str | None): The time value to dump 

205 

206 Returns: 

207 str | None: The string representation of the time value or None if the value is None 

208 """ 

209 if isinstance(var, datetime.time): 

210 if var.tzinfo is not None: 

211 # NOTE strftime does not support timezone, so we need to convert to UTC 

212 offset_seconds = var.tzinfo.utcoffset(None).total_seconds() 

213 time_seconds = var.hour * 3600 + var.minute * 60 + var.second 

214 utc_seconds = (time_seconds - offset_seconds) % (24 * 3600) 

215 hours = int(utc_seconds // 3600) 

216 minutes = int((utc_seconds % 3600) // 60) 

217 seconds = int(utc_seconds % 60) 

218 var = datetime.time(hours, minutes, seconds, var.microsecond) 

219 return var.strftime("%H:%M:%S") 

220 elif isinstance(var, datetime.datetime): 

221 if var.tzinfo is not None: 

222 return var.astimezone(datetime.timezone.utc).strftime("%H:%M:%S") 

223 return var.strftime("%H:%M:%S") 

224 elif isinstance(var, pd.Timestamp): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 if var.tzinfo is not None: 

226 return var.tz_convert("UTC").strftime("%H:%M:%S") 

227 return var.strftime("%H:%M:%S") 

228 elif isinstance(var, str): 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 return var 

230 elif pd.isna(var): 230 ↛ 232line 230 didn't jump to line 232 because the condition on line 230 was always true

231 return None 

232 logger.warning(f"Unexpected value type for TIME: {type(var)}, {var}") 

233 return _dump_str(var) 

234 

235 

236def _dump_vector(value: Any) -> bytes | None: 

237 """Convert array or list of floats to a bytes. 

238 

239 Args: 

240 value (Any): The value to dump 

241 

242 Returns: 

243 bytes | None: The bytes representation of the vector value or None if the value is None 

244 """ 

245 if isinstance(value, (array, list, np.ndarray)): 

246 return b"".join([struct.pack("<f", el) for el in value]) 

247 elif pd.isna(value): 

248 return None 

249 err_msg = f"Unexpected value type for VECTOR: {type(value)}, {value}" 

250 logger.error(err_msg) 

251 raise ValueError(err_msg) 

252 

253 

254def _handle_series_as_date(series: pd.Series) -> pd.Series: 

255 """Convert values in a series to a string representation of a date. 

256 NOTE: MySQL require exactly %Y-%m-%d for DATE type. 

257 

258 Args: 

259 series (pd.Series): The series to handle 

260 

261 Returns: 

262 pd.Series: The series with the date values as strings 

263 """ 

264 if pd_types.is_datetime64_any_dtype(series.dtype): 

265 return series.dt.strftime("%Y-%m-%d") 

266 elif pd_types.is_object_dtype(series.dtype): 266 ↛ 268line 266 didn't jump to line 268 because the condition on line 266 was always true

267 return series.apply(_dump_date) 

268 logger.info(f"Unexpected dtype: {series.dtype} for column with type DATE") 

269 return series.apply(_dump_str) 

270 

271 

272def _handle_series_as_datetime(series: pd.Series) -> pd.Series: 

273 """Convert values in a series to a string representation of a datetime. 

274 NOTE: MySQL's DATETIME type require exactly %Y-%m-%d %H:%M:%S format. 

275 

276 Args: 

277 series (pd.Series): The series to handle 

278 

279 Returns: 

280 pd.Series: The series with the datetime values as strings 

281 """ 

282 if pd_types.is_datetime64_any_dtype(series.dtype): 

283 return series.dt.strftime("%Y-%m-%d %H:%M:%S") 

284 elif pd_types.is_object_dtype(series.dtype): 284 ↛ 286line 284 didn't jump to line 286 because the condition on line 284 was always true

285 return series.apply(_dump_datetime) 

286 logger.info(f"Unexpected dtype: {series.dtype} for column with type DATETIME") 

287 return series.apply(_dump_str) 

288 

289 

290def _handle_series_as_time(series: pd.Series) -> pd.Series: 

291 """Convert values in a series to a string representation of a time. 

292 NOTE: MySQL's TIME type require exactly %H:%M:%S format. 

293 

294 Args: 

295 series (pd.Series): The series to handle 

296 

297 Returns: 

298 pd.Series: The series with the time values as strings 

299 """ 

300 if pd_types.is_timedelta64_ns_dtype(series.dtype): 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 base_time = pd.Timestamp("2000-01-01") 

302 series = (base_time + series).dt.strftime("%H:%M:%S") 

303 elif pd_types.is_datetime64_dtype(series.dtype): 

304 series = series.dt.strftime("%H:%M:%S") 

305 elif pd_types.is_object_dtype(series.dtype): 305 ↛ 308line 305 didn't jump to line 308 because the condition on line 305 was always true

306 series = series.apply(_dump_time) 

307 else: 

308 logger.info(f"Unexpected dtype: {series.dtype} for column with type TIME") 

309 series = series.apply(_dump_str) 

310 return series 

311 

312 

313def _handle_series_as_int(series: pd.Series) -> pd.Series: 

314 """Dump series to str(int) (or just str, of can't case to int). This need because of DataFrame store imput int as 

315 float if dtype is object: pd.DataFrame([None, 1], dtype='object') -> [NaN, 1.0] 

316 

317 Args: 

318 series (pd.Series): The series to handle 

319 

320 Returns: 

321 pd.Series: The series with the int values as strings 

322 """ 

323 if pd_types.is_integer_dtype(series.dtype): 

324 if series.dtype == "Int64": 

325 # NOTE: 'apply' converts values to python floats 

326 return series.astype(object).apply(_dump_str) 

327 return series.apply(_dump_str) 

328 return series.apply(_dump_int_or_str) 

329 

330 

331def _handle_series_as_vector(series: pd.Series) -> pd.Series: 

332 """Convert values in a series to a bytes representation of a vector. 

333 NOTE: MySQL's VECTOR type require exactly 4 bytes per float. 

334 

335 Args: 

336 series (pd.Series): The series to handle 

337 

338 Returns: 

339 pd.Series: The series with the vector values as bytes 

340 """ 

341 return series.apply(_dump_vector) 

342 

343 

344def dump_result_set_to_mysql( 

345 result_set: ResultSet, infer_column_size: bool = False 

346) -> tuple[pd.DataFrame, list[dict[str, str | int]]]: 

347 """ 

348 Dumps the ResultSet to a format that can be used to send as MySQL response packet. 

349 NOTE: This method modifies the original DataFrame and columns. 

350 

351 Args: 

352 result_set (ResultSet): result set to dump 

353 infer_column_size (bool): If True, infer the 'size' attribute of the column from the data. 

354 Exact size is not necessary, approximate is enough. 

355 

356 Returns: 

357 tuple[pd.DataFrame, list[dict[str, str | int]]]: A tuple containing the modified DataFrame and a list 

358 of MySQL column dictionaries. The dataframe values are 

359 str or None, dtype=object 

360 """ 

361 df = result_set.get_raw_df() 

362 

363 for i, column in enumerate(result_set.columns): 

364 series = df[i] 

365 if isinstance(column.type, MYSQL_DATA_TYPE) is False: 

366 column.type = get_mysql_data_type_from_series(series) 

367 

368 column_type: MYSQL_DATA_TYPE = column.type 

369 

370 match column_type: 

371 case MYSQL_DATA_TYPE.BOOL | MYSQL_DATA_TYPE.BOOLEAN: 

372 series = series.apply(_dump_bool) 

373 case MYSQL_DATA_TYPE.DATE: 

374 series = _handle_series_as_date(series) 

375 case MYSQL_DATA_TYPE.DATETIME: 

376 series = _handle_series_as_datetime(series) 

377 case MYSQL_DATA_TYPE.TIME: 

378 series = _handle_series_as_time(series) 

379 case ( 

380 MYSQL_DATA_TYPE.INT 

381 | MYSQL_DATA_TYPE.TINYINT 

382 | MYSQL_DATA_TYPE.SMALLINT 

383 | MYSQL_DATA_TYPE.MEDIUMINT 

384 | MYSQL_DATA_TYPE.BIGINT 

385 | MYSQL_DATA_TYPE.YEAR 

386 ): 

387 series = _handle_series_as_int(series) 

388 case MYSQL_DATA_TYPE.VECTOR: 388 ↛ 389line 388 didn't jump to line 389 because the pattern on line 388 never matched

389 series = _handle_series_as_vector(series) 

390 case _: 

391 series = series.apply(_dump_str) 

392 

393 # inplace modification of dt types raise SettingWithCopyWarning, so do regular replace 

394 # we may split this operation for dt and other types for optimisation 

395 df[i] = series.replace([np.nan, pd.NA, pd.NaT], None) 

396 

397 columns_dicts = [column_to_mysql_column_dict(column) for column in result_set.columns] 

398 

399 if infer_column_size and any(column_info.get("size") is None for column_info in columns_dicts): 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true

400 if len(df) == 0: 

401 for column_info in columns_dicts: 

402 if column_info["size"] is None: 

403 column_info["size"] = 1 

404 else: 

405 sample = df.head(100) 

406 for i, column_info in enumerate(columns_dicts): 

407 try: 

408 column_info["size"] = sample[sample.columns[i]].astype(str).str.len().max() 

409 except Exception: 

410 column_info["size"] = 1 

411 

412 return df, columns_dicts 

413 

414 

415def dump_columns_info(result_set: ResultSet, infer_column_size: bool = False) -> list[dict[str, str | int]]: 

416 """Preare list of columns attrs that are required for dump to mysql protocol 

417 

418 Args: 

419 result_set (ResultSet): result set 

420 infer_column_size (bool): If True, infer the 'size' attribute of the column from the data. 

421 Exact size is not necessary, approximate is enough. 

422 

423 Returns: 

424 list[dict[str, str | int]]: list of MySQL column dictionaries. 

425 """ 

426 df = result_set.get_raw_df() 

427 

428 for i, column in enumerate(result_set.columns): 

429 series = df[i] 

430 if isinstance(column.type, MYSQL_DATA_TYPE) is False: 

431 column.type = get_mysql_data_type_from_series(series) 

432 

433 columns_dicts = [column_to_mysql_column_dict(column) for column in result_set.columns] 

434 

435 if infer_column_size and any(column_info.get("size") is None for column_info in columns_dicts): 

436 if len(df) == 0: 

437 for column_info in columns_dicts: 

438 if column_info["size"] is None: 

439 column_info["size"] = 1 

440 else: 

441 sample = df.head(100) 

442 for i, column_info in enumerate(columns_dicts): 

443 try: 

444 column_info["size"] = sample[sample.columns[i]].astype(str).str.len().max() 

445 except Exception: 

446 column_info["size"] = 1 

447 

448 return columns_dicts 

449 

450 

451def serialize_bytes(data: bytes) -> bytes: 

452 """serialize bytes to mysql protocol 

453 

454 Args: 

455 data (bytes): the bytes to serialize 

456 

457 Returns: 

458 bytes: the serialized bytes 

459 """ 

460 if data == NULL_VALUE: 

461 return data 

462 else: 

463 return Datum.serialize_bytes(data) 

464 

465 

466def dump_chunks(df: pd.DataFrame, columns_info: list[dict], chunk_size: int): 

467 """Serialize dataframe values to mysql TEXT protocol 

468 

469 Args: 

470 df (pd.DataFrame): the dataframe to serialize 

471 columns_info (list[dict]): the columns info 

472 chunk_size (int): the chunk size 

473 

474 Yields: 

475 list[bytes]: the serialized dataframe values 

476 """ 

477 start = 0 

478 while start < len(df): 

479 serieces = [] 

480 for i, column in enumerate(columns_info): 

481 series = df[i][start : start + chunk_size] 

482 match column["type_enum"]: 

483 case MYSQL_DATA_TYPE.BOOL | MYSQL_DATA_TYPE.BOOLEAN: 

484 series = series.apply(_dump_bool) 

485 case MYSQL_DATA_TYPE.DATE: 

486 series = _handle_series_as_date(series) 

487 case MYSQL_DATA_TYPE.DATETIME: 

488 series = _handle_series_as_datetime(series) 

489 case MYSQL_DATA_TYPE.TIME: 

490 series = _handle_series_as_time(series) 

491 case MYSQL_DATA_TYPE.VECTOR: 

492 series = _handle_series_as_vector(series) 

493 case ( 

494 MYSQL_DATA_TYPE.INT 

495 | MYSQL_DATA_TYPE.TINYINT 

496 | MYSQL_DATA_TYPE.SMALLINT 

497 | MYSQL_DATA_TYPE.MEDIUMINT 

498 | MYSQL_DATA_TYPE.BIGINT 

499 | MYSQL_DATA_TYPE.YEAR 

500 | MYSQL_DATA_TYPE.FLOAT 

501 | MYSQL_DATA_TYPE.DOUBLE 

502 | MYSQL_DATA_TYPE.DECIMAL 

503 | MYSQL_DATA_TYPE.TIMESTAMP 

504 ): 

505 pass 

506 case MYSQL_DATA_TYPE.TEXT: 

507 # NOTE: it would be good do nothing for TEXT column as for INT. 

508 # However, while we use TEXT as a fallback for undetected types, we need to handle it carefully. 

509 series = series.apply(_dump_str) 

510 case _: 

511 series = series.apply(_dump_str) 

512 serieces.append(series.astype(bytes).mask(series.isnull(), NULL_VALUE).apply(serialize_bytes)) 

513 

514 yield pd.concat(serieces, axis=1).sum(axis=1).tolist() 

515 

516 start += chunk_size