Coverage for mindsdb / utilities / log.py: 29%

332 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import re 

2import os 

3import json 

4import logging 

5import threading 

6from typing import Any 

7from logging.config import dictConfig 

8 

9from mindsdb.utilities.config import config as app_config 

10 

11 

12logging_initialized = False 

13 

14 

15class JsonFormatter(logging.Formatter): 

16 def format(self, record): 

17 record_message = super().format(record) 

18 log_record = { 

19 "process_name": record.processName, 

20 "name": record.name, 

21 "message": record_message, 

22 "level": record.levelname, 

23 "time": record.created, 

24 } 

25 return json.dumps(log_record) 

26 

27 

28class ColorFormatter(logging.Formatter): 

29 green = "\x1b[32;20m" 

30 default = "\x1b[39;20m" 

31 yellow = "\x1b[33;20m" 

32 red = "\x1b[31;20m" 

33 bold_red = "\x1b[31;1m" 

34 reset = "\x1b[0m" 

35 format = "%(asctime)s %(processName)15s %(levelname)-8s %(name)s: %(message)s" 

36 

37 FORMATS = { 

38 logging.DEBUG: logging.Formatter(green + format + reset), 

39 logging.INFO: logging.Formatter(default + format + reset), 

40 logging.WARNING: logging.Formatter(yellow + format + reset), 

41 logging.ERROR: logging.Formatter(red + format + reset), 

42 logging.CRITICAL: logging.Formatter(bold_red + format + reset), 

43 } 

44 

45 def format(self, record): 

46 log_fmt = self.FORMATS.get(record.levelno) 

47 return log_fmt.format(record) 

48 

49 

50FORMATTERS = { 

51 "default": {"()": ColorFormatter}, 

52 "json": {"()": JsonFormatter}, 

53 "file": {"format": "%(asctime)s %(processName)15s %(levelname)-8s %(name)s: %(message)s"}, 

54} 

55 

56 

57class LogSanitizer: 

58 """Log Sanitizer""" 

59 

60 SENSITIVE_KEYS = { 

61 "password", 

62 "passwd", 

63 "pwd", 

64 "token", 

65 "access_token", 

66 "refresh_token", 

67 "bearer_token", 

68 "api_key", 

69 "apikey", 

70 "api-key", 

71 "openai_api_key", 

72 "secret", 

73 "secret_key", 

74 "client_secret", 

75 "credentials", 

76 "auth", 

77 "authorization", 

78 "private_key", 

79 "private-key", 

80 "session_id", 

81 "sessionid", 

82 "credit_card", 

83 "card_number", 

84 "cvv", 

85 } 

86 

87 def __init__(self, mask: str | None = None): 

88 self.mask = mask or "********" 

89 self._compile_patterns() 

90 

91 def _compile_patterns(self): 

92 self.search_pattern = re.compile( 

93 r"\b(" + "|".join(re.escape(key) for key in self.SENSITIVE_KEYS) + r")\b", re.IGNORECASE 

94 ) 

95 self.patterns = [] 

96 for key in self.SENSITIVE_KEYS: 

97 # Patterns for: key=value, key: value, "key": "value", 'key': 'value' 

98 # Note: negative lookahead (?!%) excludes Python format placeholders like %s, %d, etc. 

99 patterns = [ 

100 re.compile(f'{key}["\s]*[:=]["\s]*(?!%)([^\s,}}\\]"\n]+)', re.IGNORECASE), 

101 re.compile(f'"{key}"["\s]*:["\s]*"([^"]+)"', re.IGNORECASE), 

102 re.compile(f"'{key}'['\s]*:['\s]*'([^']+)'", re.IGNORECASE), 

103 ] 

104 self.patterns.extend(patterns) 

105 

106 def _replace(self, m) -> str: 

107 return m.group(0).replace(m.group(1), self.mask) 

108 

109 def sanitize_text(self, text: str) -> str: 

110 if self.search_pattern.search(text): 

111 for pattern in self.patterns: 

112 text = pattern.sub(self._replace, text) 

113 return text 

114 

115 def sanitize_dict(self, data: dict) -> dict: 

116 if not isinstance(data, dict): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 return data 

118 

119 sanitized = {} 

120 for key, value in data.items(): 

121 if any(sensitive in str(key).lower() for sensitive in self.SENSITIVE_KEYS): 

122 sanitized[key] = self.mask 

123 elif isinstance(value, dict): 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true

124 sanitized[key] = self.sanitize_dict(value) 

125 elif isinstance(value, list): 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 sanitized[key] = [self.sanitize_dict(item) if isinstance(item, dict) else item for item in value] 

127 else: 

128 sanitized[key] = value 

129 return sanitized 

130 

131 def sanitize(self, data: Any) -> Any: 

132 if isinstance(data, dict): 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true

133 return self.sanitize_dict(data) 

134 elif isinstance(data, str): 

135 return self.sanitize_text(data) 

136 elif isinstance(data, (list, tuple)): 

137 return type(data)(self.sanitize(item) for item in data) 

138 return data 

139 

140 

141class SanitizingMixin: 

142 """Mixin for sanitizing log records.""" 

143 

144 def __init__(self, *args, **kwargs): 

145 super().__init__(*args, **kwargs) 

146 self.sanitizer = LogSanitizer() 

147 

148 def sanitize_record(self, record): 

149 """Sanitize a log record before emitting.""" 

150 if ( 

151 hasattr(record, "args") 

152 and isinstance(record.args, (list, tuple)) 

153 and len(record.args) > 0 

154 and isinstance(record.msg, str) 

155 ): 

156 record.msg = record.msg % record.args 

157 record.args = [] 

158 

159 if isinstance(record.msg, str): 

160 record.msg = self.sanitizer.sanitize_text(record.msg) 

161 elif isinstance(record.msg, dict): 161 ↛ 164line 161 didn't jump to line 164 because the condition on line 161 was always true

162 record.msg = self.sanitizer.sanitize_dict(record.msg) 

163 

164 if hasattr(record, "args") and record.args: 

165 record.args = self.sanitizer.sanitize(record.args) 

166 

167 return record 

168 

169 

170class StreamSanitizingHandler(SanitizingMixin, logging.StreamHandler): 

171 def emit(self, record): 

172 record = self.sanitize_record(record) 

173 super().emit(record) 

174 

175 

176class FileSanitizingHandler(SanitizingMixin, logging.handlers.RotatingFileHandler): 

177 def emit(self, record): 

178 record = self.sanitize_record(record) 

179 super().emit(record) 

180 

181 

182def get_console_handler_config_level() -> int: 

183 console_handler_config = app_config["logging"]["handlers"]["console"] 

184 return getattr(logging, console_handler_config["level"]) 

185 

186 

187def get_file_handler_config_level() -> int: 

188 file_handler_config = app_config["logging"]["handlers"]["file"] 

189 return getattr(logging, file_handler_config["level"]) 

190 

191 

192def get_mindsdb_log_level() -> int: 

193 console_handler_config_level = get_console_handler_config_level() 

194 file_handler_config_level = get_file_handler_config_level() 

195 

196 return min(console_handler_config_level, file_handler_config_level) 

197 

198 

199def get_handlers_config(process_name: str) -> dict: 

200 handlers_config = {} 

201 console_handler_config = app_config["logging"]["handlers"]["console"] 

202 console_handler_config_level = getattr(logging, console_handler_config["level"]) 

203 if console_handler_config["enabled"] is True: 203 ↛ 210line 203 didn't jump to line 210 because the condition on line 203 was always true

204 handlers_config["console"] = { 

205 "class": "mindsdb.utilities.log.StreamSanitizingHandler", 

206 "formatter": console_handler_config.get("formatter", "default"), 

207 "level": console_handler_config_level, 

208 } 

209 

210 file_handler_config = app_config["logging"]["handlers"]["file"] 

211 file_handler_config_level = getattr(logging, file_handler_config["level"]) 

212 if file_handler_config["enabled"] is True: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 file_name = file_handler_config["filename"] 

214 if process_name is not None: 

215 if "." in file_name: 

216 parts = file_name.rpartition(".") 

217 file_name = f"{parts[0]}_{process_name}.{parts[2]}" 

218 else: 

219 file_name = f"{file_name}_{process_name}" 

220 handlers_config["file"] = { 

221 "class": "mindsdb.utilities.log.FileSanitizingHandler", 

222 "formatter": "file", 

223 "level": file_handler_config_level, 

224 "filename": app_config.paths["log"] / file_name, 

225 "maxBytes": file_handler_config["maxBytes"], # 0.5 Mb 

226 "backupCount": file_handler_config["backupCount"], 

227 } 

228 return handlers_config 

229 

230 

231def configure_logging(process_name: str = None): 

232 handlers_config = get_handlers_config(process_name) 

233 mindsdb_log_level = get_mindsdb_log_level() 

234 

235 logging_config = dict( 

236 version=1, 

237 formatters=FORMATTERS, 

238 handlers=handlers_config, 

239 loggers={ 

240 "": { # root logger 

241 "handlers": list(handlers_config.keys()), 

242 "level": mindsdb_log_level, 

243 }, 

244 "__main__": { 

245 "level": mindsdb_log_level, 

246 }, 

247 "mindsdb": { 

248 "level": mindsdb_log_level, 

249 }, 

250 "alembic": { 

251 "level": mindsdb_log_level, 

252 }, 

253 }, 

254 ) 

255 

256 dictConfig(logging_config) 

257 

258 

259def initialize_logging(process_name: str = None) -> None: 

260 """Initialyze logging""" 

261 global logging_initialized 

262 if not logging_initialized: 

263 configure_logging(process_name) 

264 logging_initialized = True 

265 

266 

267# I would prefer to leave code to use logging.getLogger(), but there are a lot of complicated situations 

268# in MindsDB with processes being spawned that require logging to be configured again in a lot of cases. 

269# Using a custom logger-getter like this lets us do that logic here, once. 

270def getLogger(name=None): 

271 """ 

272 Get a new logger, configuring logging first if it hasn't been done yet. 

273 """ 

274 initialize_logging() 

275 return logging.getLogger(name) 

276 

277 

278def log_ram_info(logger: logging.Logger) -> None: 

279 """Log RAM/memory information to the provided logger. 

280 

281 This function logs memory usage information: total, available, used memory in GB and memory 

282 usage percentage. The logging only occurs if the logger is enabled for DEBUG level. 

283 

284 Args: 

285 logger (logging.Logger): The logger instance to use for outputting memory information. 

286 """ 

287 if logger.isEnabledFor(logging.DEBUG) is False: 287 ↛ 290line 287 didn't jump to line 290 because the condition on line 287 was always true

288 return 

289 

290 try: 

291 import psutil 

292 

293 memory = psutil.virtual_memory() 

294 total_memory_gb = memory.total / (1024**3) 

295 available_memory_gb = memory.available / (1024**3) 

296 used_memory_gb = memory.used / (1024**3) 

297 memory_percent = memory.percent 

298 logger.debug( 

299 f"Memory: {total_memory_gb:.1f}GB total, {available_memory_gb:.1f}GB available, {used_memory_gb:.1f}GB used ({memory_percent:.1f}%)" 

300 ) 

301 except Exception as e: 

302 logger.debug(f"Failed to get memory information: {e}") 

303 

304 

305def log_system_info(logger: logging.Logger) -> None: 

306 """Log detailed system information for debugging purposes. 

307 

308 The function only logs system information (if the logger is configured for DEBUG level): 

309 - Operating system details (OS type, version, distribution, architecture) 

310 - CPU information (processor type, physical and logical core counts) 

311 - Memory information (total, available, used memory in GB and percentage) 

312 - GPU information (NVIDIA, AMD, Intel graphics cards with memory details) 

313 

314 Args: 

315 logger (logging.Logger): The logger instance to use for outputting system information. 

316 Must be configured for DEBUG level to see the output. 

317 

318 Returns: 

319 None 

320 

321 Note: 

322 - For Linux systems, attempts to detect distribution via /etc/os-release, /etc/issue, or lsb_release 

323 - For Windows systems, uses wmic commands to get detailed OS and GPU information 

324 - For macOS systems, uses sw_vers and system_profiler commands 

325 - GPU detection supports NVIDIA (via nvidia-smi), AMD (via rocm-smi), and fallback methods 

326 - All subprocess calls have timeout protection to prevent hanging 

327 - If any system information gathering fails, it logs the error and continues 

328 """ 

329 if logger.isEnabledFor(logging.DEBUG) is False: 

330 return 

331 

332 try: 

333 import os 

334 import shutil 

335 import psutil 

336 import platform 

337 import subprocess 

338 

339 # region OS information 

340 os_system = platform.system() 

341 os_release = platform.release() 

342 os_machine = platform.machine() 

343 

344 os_details = [] 

345 

346 if os_system == "Linux": 

347 # Try to detect Linux distribution 

348 distro_info = "Unknown Linux" 

349 try: 

350 # Check for /etc/os-release (most modern distributions) 

351 if os.path.exists("/etc/os-release"): 

352 with open("/etc/os-release", "r") as f: 

353 os_release_data = {} 

354 for line in f: 

355 if "=" in line: 

356 key, value = line.strip().split("=", 1) 

357 os_release_data[key] = value.strip('"') 

358 

359 if "PRETTY_NAME" in os_release_data: 

360 distro_info = os_release_data["PRETTY_NAME"] 

361 elif "NAME" in os_release_data and "VERSION" in os_release_data: 

362 distro_info = f"{os_release_data['NAME']} {os_release_data['VERSION']}" 

363 elif "ID" in os_release_data: 

364 distro_info = os_release_data["ID"].title() 

365 # Fallback to /etc/issue 

366 elif os.path.exists("/etc/issue"): 

367 with open("/etc/issue", "r") as f: 

368 issue_content = f.read().strip() 

369 if issue_content: 

370 distro_info = issue_content.split("\n")[0] 

371 # Fallback to lsb_release 

372 else: 

373 try: 

374 result = subprocess.run(["lsb_release", "-d"], capture_output=True, text=True, timeout=2) 

375 if result.returncode == 0: 

376 distro_info = result.stdout.split(":")[-1].strip() 

377 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): 

378 pass 

379 except Exception: 

380 pass 

381 

382 os_details.append(f"{distro_info} (kernel {os_release})") 

383 

384 elif os_system == "Windows": 

385 os_name = "Windows" 

386 os_version = "unknown" 

387 try: 

388 result = subprocess.run( 

389 ["wmic", "os", "get", "Caption,Version", "/format:list"], capture_output=True, text=True, timeout=3 

390 ) 

391 if result.returncode == 0: 

392 windows_info = {} 

393 for line in result.stdout.strip().split("\n"): 

394 if "=" in line: 

395 key, value = line.strip().split("=", 1) 

396 windows_info[key] = value.strip() 

397 

398 if "Caption" in windows_info and "Version" in windows_info: 

399 os_name = windows_info["Caption"] 

400 os_version = windows_info["Version"] 

401 except Exception: 

402 pass 

403 os_details.append(f"{os_name} {os_release} (version {os_version})") 

404 

405 elif os_system == "Darwin": # macOS 

406 os_name = "macOS" 

407 os_version = "unknown" 

408 try: 

409 result = subprocess.run( 

410 ["sw_vers", "-productName", "-productVersion"], capture_output=True, text=True, timeout=3 

411 ) 

412 if result.returncode == 0: 

413 lines = result.stdout.strip().split("\n") 

414 if len(lines) >= 2: 

415 os_name = lines[0].strip() 

416 os_version = lines[1].strip() 

417 except Exception: 

418 pass 

419 os_details.append(f"{os_name} {os_release} (version {os_version})") 

420 else: 

421 os_details.append(f"{os_system} {os_release}") 

422 

423 os_details.append(f"({os_machine})") 

424 os_info = " ".join(os_details) 

425 logger.debug(f"Operating System: {os_info}") 

426 # endregion 

427 

428 # region CPU information 

429 cpu_info = platform.processor() 

430 if not cpu_info or cpu_info == "": 

431 cpu_info = platform.machine() 

432 cpu_count = psutil.cpu_count(logical=False) 

433 cpu_count_logical = psutil.cpu_count(logical=True) 

434 logger.debug(f"CPU: {cpu_info} ({cpu_count} physical cores, {cpu_count_logical} logical cores)") 

435 # endregion 

436 

437 # memory information 

438 log_ram_info(logger) 

439 

440 # region GPU information 

441 gpu_info = [] 

442 try: 

443 # Check for NVIDIA GPU (works on Linux, Windows, macOS) 

444 nvidia_smi_path = shutil.which("nvidia-smi") 

445 if nvidia_smi_path: 

446 try: 

447 result = subprocess.run( 

448 [nvidia_smi_path, "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"], 

449 capture_output=True, 

450 text=True, 

451 timeout=3, 

452 ) 

453 if result.returncode == 0: 

454 for line in result.stdout.strip().split("\n"): 

455 if line.strip(): 

456 parts = line.split(", ") 

457 if len(parts) >= 2: 

458 gpu_name = parts[0].strip() 

459 gpu_memory = parts[1].strip() 

460 gpu_info.append(f"{gpu_name} ({gpu_memory}MB)") 

461 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): 

462 pass 

463 

464 # Check for AMD GPU (rocm-smi on Linux, wmic on Windows) 

465 if not gpu_info: # Only check AMD if no NVIDIA GPU found 

466 if platform.system() == "Windows": 

467 # Use wmic on Windows to detect AMD GPU 

468 try: 

469 result = subprocess.run( 

470 ["wmic", "path", "win32_VideoController", "get", "name"], 

471 capture_output=True, 

472 text=True, 

473 timeout=3, 

474 ) 

475 if result.returncode == 0: 

476 for line in result.stdout.strip().split("\n"): 

477 line = line.strip() 

478 if line and line != "Name" and "AMD" in line.upper(): 

479 gpu_info.append(line) 

480 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): 

481 pass 

482 else: 

483 # Use rocm-smi on Linux/macOS 

484 rocm_smi_path = shutil.which("rocm-smi") 

485 if rocm_smi_path: 

486 try: 

487 result = subprocess.run( 

488 [rocm_smi_path, "--showproductname"], capture_output=True, text=True, timeout=3 

489 ) 

490 if result.returncode == 0: 

491 for line in result.stdout.strip().split("\n"): 

492 if "Product Name" in line: 

493 gpu_name = line.split(":")[-1].strip() 

494 gpu_info.append(gpu_name) 

495 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): 

496 pass 

497 

498 # Fallback: Try to detect any GPU using platform-specific methods 

499 if not gpu_info: 

500 if platform.system() == "Windows": 

501 try: 

502 # Use wmic to get all video controllers 

503 result = subprocess.run( 

504 ["wmic", "path", "win32_VideoController", "get", "name"], 

505 capture_output=True, 

506 text=True, 

507 timeout=3, 

508 ) 

509 if result.returncode == 0: 

510 for line in result.stdout.strip().split("\n"): 

511 line = line.strip() 

512 if ( 

513 line 

514 and line != "Name" 

515 and any( 

516 keyword in line.upper() 

517 for keyword in ["NVIDIA", "AMD", "INTEL", "RADEON", "GEFORCE"] 

518 ) 

519 ): 

520 gpu_info.append(line) 

521 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): 

522 pass 

523 elif platform.system() == "Darwin": # macOS 

524 try: 

525 # Use system_profiler on macOS 

526 result = subprocess.run( 

527 ["system_profiler", "SPDisplaysDataType"], capture_output=True, text=True, timeout=3 

528 ) 

529 if result.returncode == 0: 

530 for line in result.stdout.strip().split("\n"): 

531 if "Chipset Model:" in line: 

532 gpu_name = line.split(":")[-1].strip() 

533 gpu_info.append(gpu_name) 

534 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): 

535 pass 

536 

537 except Exception: 

538 pass 

539 

540 if gpu_info: 

541 logger.debug(f"GPU: {', '.join(gpu_info)}") 

542 else: 

543 logger.debug("GPU: Not detected or not supported") 

544 # endregion 

545 

546 except Exception as e: 

547 logger.debug(f"Failed to get system information: {e}") 

548 

549 

550def resources_log_thread(stop_event: threading.Event, interval: int = 60): 

551 """Log resources information to the logger 

552 

553 Args: 

554 stop_event (Event): Event to stop the thread 

555 interval (int): Interval in seconds to log resources information 

556 

557 Returns: 

558 None 

559 

560 Note: 

561 Output shows: 

562 - RAM: total, available, used memory in GB and memory usage percentage 

563 - Consumed RAM: sum of rss, and percentage of total memory used 

564 - CPU usage: average CPU usage for last period 

565 - Active queries: number of active SQL queries 

566 """ 

567 from mindsdb.utilities.fs import get_tmp_dir 

568 

569 logger = getLogger(__name__) 

570 while stop_event.wait(timeout=interval) is False: 

571 try: 

572 import psutil 

573 

574 main_process = psutil.Process(os.getpid()) 

575 children = main_process.children(recursive=True) 

576 

577 total_memory_info = { 

578 "main_process": { 

579 "pid": main_process.pid, 

580 "name": main_process.name(), 

581 "memory_info": main_process.memory_info(), 

582 "memory_percent": main_process.memory_percent(), 

583 }, 

584 "children": [], 

585 "total_memory": {"rss": 0, "vms": 0, "percent": 0}, 

586 } 

587 

588 for child in children: 

589 try: 

590 child_info = { 

591 "pid": child.pid, 

592 "name": child.name(), 

593 "memory_info": child.memory_info(), 

594 "memory_percent": child.memory_percent(), 

595 } 

596 total_memory_info["children"].append(child_info) 

597 

598 total_memory_info["total_memory"]["rss"] += child.memory_info().rss 

599 total_memory_info["total_memory"]["vms"] += child.memory_info().vms 

600 total_memory_info["total_memory"]["percent"] += child.memory_percent() 

601 except (psutil.NoSuchProcess, psutil.AccessDenied): 

602 continue 

603 

604 total_memory_info["total_memory"]["rss"] += main_process.memory_info().rss 

605 total_memory_info["total_memory"]["vms"] += main_process.memory_info().vms 

606 total_memory_info["total_memory"]["percent"] += main_process.memory_percent() 

607 

608 memory = psutil.virtual_memory() 

609 total_memory_gb = memory.total / (1024**3) 

610 available_memory_gb = memory.available / (1024**3) 

611 used_memory_gb = memory.used / (1024**3) 

612 memory_percent = memory.percent 

613 cpu_usage = psutil.cpu_percent() 

614 

615 active_http_queries = 0 

616 p = get_tmp_dir().joinpath("processes/http_query/") 

617 if p.exists() and p.is_dir(): 

618 for _ in p.iterdir(): 

619 active_http_queries += 1 

620 

621 active_mysql_queries = 0 

622 p = get_tmp_dir().joinpath("processes/mysql_query/") 

623 if p.exists() and p.is_dir(): 

624 for _ in p.iterdir(): 

625 active_mysql_queries += 1 

626 

627 level = app_config["logging"]["resources_log"]["level"] 

628 logger.log( 

629 logging.getLevelName(level), 

630 f"RAM: {total_memory_gb:.1f}GB total, {available_memory_gb:.1f}GB available, {used_memory_gb:.1f}GB used ({memory_percent:.1f}%)\n" 

631 f"Consumed RAM: {total_memory_info['total_memory']['rss'] / (1024**2):.1f}Mb, {total_memory_info['total_memory']['percent']:.2f}%\n" 

632 f"CPU usage: {cpu_usage}% {interval}s\n" 

633 f"Active queries: {active_http_queries}/HTTP {active_mysql_queries}/MySQL", 

634 ) 

635 except Exception as e: 

636 logger.debug(f"Failed to get memory information: {e}")