Coverage for mindsdb / utilities / log.py: 29%
332 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import re
2import os
3import json
4import logging
5import threading
6from typing import Any
7from logging.config import dictConfig
9from mindsdb.utilities.config import config as app_config
12logging_initialized = False
15class JsonFormatter(logging.Formatter):
16 def format(self, record):
17 record_message = super().format(record)
18 log_record = {
19 "process_name": record.processName,
20 "name": record.name,
21 "message": record_message,
22 "level": record.levelname,
23 "time": record.created,
24 }
25 return json.dumps(log_record)
28class ColorFormatter(logging.Formatter):
29 green = "\x1b[32;20m"
30 default = "\x1b[39;20m"
31 yellow = "\x1b[33;20m"
32 red = "\x1b[31;20m"
33 bold_red = "\x1b[31;1m"
34 reset = "\x1b[0m"
35 format = "%(asctime)s %(processName)15s %(levelname)-8s %(name)s: %(message)s"
37 FORMATS = {
38 logging.DEBUG: logging.Formatter(green + format + reset),
39 logging.INFO: logging.Formatter(default + format + reset),
40 logging.WARNING: logging.Formatter(yellow + format + reset),
41 logging.ERROR: logging.Formatter(red + format + reset),
42 logging.CRITICAL: logging.Formatter(bold_red + format + reset),
43 }
45 def format(self, record):
46 log_fmt = self.FORMATS.get(record.levelno)
47 return log_fmt.format(record)
50FORMATTERS = {
51 "default": {"()": ColorFormatter},
52 "json": {"()": JsonFormatter},
53 "file": {"format": "%(asctime)s %(processName)15s %(levelname)-8s %(name)s: %(message)s"},
54}
57class LogSanitizer:
58 """Log Sanitizer"""
60 SENSITIVE_KEYS = {
61 "password",
62 "passwd",
63 "pwd",
64 "token",
65 "access_token",
66 "refresh_token",
67 "bearer_token",
68 "api_key",
69 "apikey",
70 "api-key",
71 "openai_api_key",
72 "secret",
73 "secret_key",
74 "client_secret",
75 "credentials",
76 "auth",
77 "authorization",
78 "private_key",
79 "private-key",
80 "session_id",
81 "sessionid",
82 "credit_card",
83 "card_number",
84 "cvv",
85 }
87 def __init__(self, mask: str | None = None):
88 self.mask = mask or "********"
89 self._compile_patterns()
91 def _compile_patterns(self):
92 self.search_pattern = re.compile(
93 r"\b(" + "|".join(re.escape(key) for key in self.SENSITIVE_KEYS) + r")\b", re.IGNORECASE
94 )
95 self.patterns = []
96 for key in self.SENSITIVE_KEYS:
97 # Patterns for: key=value, key: value, "key": "value", 'key': 'value'
98 # Note: negative lookahead (?!%) excludes Python format placeholders like %s, %d, etc.
99 patterns = [
100 re.compile(f'{key}["\s]*[:=]["\s]*(?!%)([^\s,}}\\]"\n]+)', re.IGNORECASE),
101 re.compile(f'"{key}"["\s]*:["\s]*"([^"]+)"', re.IGNORECASE),
102 re.compile(f"'{key}'['\s]*:['\s]*'([^']+)'", re.IGNORECASE),
103 ]
104 self.patterns.extend(patterns)
106 def _replace(self, m) -> str:
107 return m.group(0).replace(m.group(1), self.mask)
109 def sanitize_text(self, text: str) -> str:
110 if self.search_pattern.search(text):
111 for pattern in self.patterns:
112 text = pattern.sub(self._replace, text)
113 return text
115 def sanitize_dict(self, data: dict) -> dict:
116 if not isinstance(data, dict): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 return data
119 sanitized = {}
120 for key, value in data.items():
121 if any(sensitive in str(key).lower() for sensitive in self.SENSITIVE_KEYS):
122 sanitized[key] = self.mask
123 elif isinstance(value, dict): 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true
124 sanitized[key] = self.sanitize_dict(value)
125 elif isinstance(value, list): 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true
126 sanitized[key] = [self.sanitize_dict(item) if isinstance(item, dict) else item for item in value]
127 else:
128 sanitized[key] = value
129 return sanitized
131 def sanitize(self, data: Any) -> Any:
132 if isinstance(data, dict): 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true
133 return self.sanitize_dict(data)
134 elif isinstance(data, str):
135 return self.sanitize_text(data)
136 elif isinstance(data, (list, tuple)):
137 return type(data)(self.sanitize(item) for item in data)
138 return data
141class SanitizingMixin:
142 """Mixin for sanitizing log records."""
144 def __init__(self, *args, **kwargs):
145 super().__init__(*args, **kwargs)
146 self.sanitizer = LogSanitizer()
148 def sanitize_record(self, record):
149 """Sanitize a log record before emitting."""
150 if (
151 hasattr(record, "args")
152 and isinstance(record.args, (list, tuple))
153 and len(record.args) > 0
154 and isinstance(record.msg, str)
155 ):
156 record.msg = record.msg % record.args
157 record.args = []
159 if isinstance(record.msg, str):
160 record.msg = self.sanitizer.sanitize_text(record.msg)
161 elif isinstance(record.msg, dict): 161 ↛ 164line 161 didn't jump to line 164 because the condition on line 161 was always true
162 record.msg = self.sanitizer.sanitize_dict(record.msg)
164 if hasattr(record, "args") and record.args:
165 record.args = self.sanitizer.sanitize(record.args)
167 return record
170class StreamSanitizingHandler(SanitizingMixin, logging.StreamHandler):
171 def emit(self, record):
172 record = self.sanitize_record(record)
173 super().emit(record)
176class FileSanitizingHandler(SanitizingMixin, logging.handlers.RotatingFileHandler):
177 def emit(self, record):
178 record = self.sanitize_record(record)
179 super().emit(record)
182def get_console_handler_config_level() -> int:
183 console_handler_config = app_config["logging"]["handlers"]["console"]
184 return getattr(logging, console_handler_config["level"])
187def get_file_handler_config_level() -> int:
188 file_handler_config = app_config["logging"]["handlers"]["file"]
189 return getattr(logging, file_handler_config["level"])
192def get_mindsdb_log_level() -> int:
193 console_handler_config_level = get_console_handler_config_level()
194 file_handler_config_level = get_file_handler_config_level()
196 return min(console_handler_config_level, file_handler_config_level)
199def get_handlers_config(process_name: str) -> dict:
200 handlers_config = {}
201 console_handler_config = app_config["logging"]["handlers"]["console"]
202 console_handler_config_level = getattr(logging, console_handler_config["level"])
203 if console_handler_config["enabled"] is True: 203 ↛ 210line 203 didn't jump to line 210 because the condition on line 203 was always true
204 handlers_config["console"] = {
205 "class": "mindsdb.utilities.log.StreamSanitizingHandler",
206 "formatter": console_handler_config.get("formatter", "default"),
207 "level": console_handler_config_level,
208 }
210 file_handler_config = app_config["logging"]["handlers"]["file"]
211 file_handler_config_level = getattr(logging, file_handler_config["level"])
212 if file_handler_config["enabled"] is True: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 file_name = file_handler_config["filename"]
214 if process_name is not None:
215 if "." in file_name:
216 parts = file_name.rpartition(".")
217 file_name = f"{parts[0]}_{process_name}.{parts[2]}"
218 else:
219 file_name = f"{file_name}_{process_name}"
220 handlers_config["file"] = {
221 "class": "mindsdb.utilities.log.FileSanitizingHandler",
222 "formatter": "file",
223 "level": file_handler_config_level,
224 "filename": app_config.paths["log"] / file_name,
225 "maxBytes": file_handler_config["maxBytes"], # 0.5 Mb
226 "backupCount": file_handler_config["backupCount"],
227 }
228 return handlers_config
231def configure_logging(process_name: str = None):
232 handlers_config = get_handlers_config(process_name)
233 mindsdb_log_level = get_mindsdb_log_level()
235 logging_config = dict(
236 version=1,
237 formatters=FORMATTERS,
238 handlers=handlers_config,
239 loggers={
240 "": { # root logger
241 "handlers": list(handlers_config.keys()),
242 "level": mindsdb_log_level,
243 },
244 "__main__": {
245 "level": mindsdb_log_level,
246 },
247 "mindsdb": {
248 "level": mindsdb_log_level,
249 },
250 "alembic": {
251 "level": mindsdb_log_level,
252 },
253 },
254 )
256 dictConfig(logging_config)
259def initialize_logging(process_name: str = None) -> None:
260 """Initialyze logging"""
261 global logging_initialized
262 if not logging_initialized:
263 configure_logging(process_name)
264 logging_initialized = True
267# I would prefer to leave code to use logging.getLogger(), but there are a lot of complicated situations
268# in MindsDB with processes being spawned that require logging to be configured again in a lot of cases.
269# Using a custom logger-getter like this lets us do that logic here, once.
270def getLogger(name=None):
271 """
272 Get a new logger, configuring logging first if it hasn't been done yet.
273 """
274 initialize_logging()
275 return logging.getLogger(name)
278def log_ram_info(logger: logging.Logger) -> None:
279 """Log RAM/memory information to the provided logger.
281 This function logs memory usage information: total, available, used memory in GB and memory
282 usage percentage. The logging only occurs if the logger is enabled for DEBUG level.
284 Args:
285 logger (logging.Logger): The logger instance to use for outputting memory information.
286 """
287 if logger.isEnabledFor(logging.DEBUG) is False: 287 ↛ 290line 287 didn't jump to line 290 because the condition on line 287 was always true
288 return
290 try:
291 import psutil
293 memory = psutil.virtual_memory()
294 total_memory_gb = memory.total / (1024**3)
295 available_memory_gb = memory.available / (1024**3)
296 used_memory_gb = memory.used / (1024**3)
297 memory_percent = memory.percent
298 logger.debug(
299 f"Memory: {total_memory_gb:.1f}GB total, {available_memory_gb:.1f}GB available, {used_memory_gb:.1f}GB used ({memory_percent:.1f}%)"
300 )
301 except Exception as e:
302 logger.debug(f"Failed to get memory information: {e}")
305def log_system_info(logger: logging.Logger) -> None:
306 """Log detailed system information for debugging purposes.
308 The function only logs system information (if the logger is configured for DEBUG level):
309 - Operating system details (OS type, version, distribution, architecture)
310 - CPU information (processor type, physical and logical core counts)
311 - Memory information (total, available, used memory in GB and percentage)
312 - GPU information (NVIDIA, AMD, Intel graphics cards with memory details)
314 Args:
315 logger (logging.Logger): The logger instance to use for outputting system information.
316 Must be configured for DEBUG level to see the output.
318 Returns:
319 None
321 Note:
322 - For Linux systems, attempts to detect distribution via /etc/os-release, /etc/issue, or lsb_release
323 - For Windows systems, uses wmic commands to get detailed OS and GPU information
324 - For macOS systems, uses sw_vers and system_profiler commands
325 - GPU detection supports NVIDIA (via nvidia-smi), AMD (via rocm-smi), and fallback methods
326 - All subprocess calls have timeout protection to prevent hanging
327 - If any system information gathering fails, it logs the error and continues
328 """
329 if logger.isEnabledFor(logging.DEBUG) is False:
330 return
332 try:
333 import os
334 import shutil
335 import psutil
336 import platform
337 import subprocess
339 # region OS information
340 os_system = platform.system()
341 os_release = platform.release()
342 os_machine = platform.machine()
344 os_details = []
346 if os_system == "Linux":
347 # Try to detect Linux distribution
348 distro_info = "Unknown Linux"
349 try:
350 # Check for /etc/os-release (most modern distributions)
351 if os.path.exists("/etc/os-release"):
352 with open("/etc/os-release", "r") as f:
353 os_release_data = {}
354 for line in f:
355 if "=" in line:
356 key, value = line.strip().split("=", 1)
357 os_release_data[key] = value.strip('"')
359 if "PRETTY_NAME" in os_release_data:
360 distro_info = os_release_data["PRETTY_NAME"]
361 elif "NAME" in os_release_data and "VERSION" in os_release_data:
362 distro_info = f"{os_release_data['NAME']} {os_release_data['VERSION']}"
363 elif "ID" in os_release_data:
364 distro_info = os_release_data["ID"].title()
365 # Fallback to /etc/issue
366 elif os.path.exists("/etc/issue"):
367 with open("/etc/issue", "r") as f:
368 issue_content = f.read().strip()
369 if issue_content:
370 distro_info = issue_content.split("\n")[0]
371 # Fallback to lsb_release
372 else:
373 try:
374 result = subprocess.run(["lsb_release", "-d"], capture_output=True, text=True, timeout=2)
375 if result.returncode == 0:
376 distro_info = result.stdout.split(":")[-1].strip()
377 except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
378 pass
379 except Exception:
380 pass
382 os_details.append(f"{distro_info} (kernel {os_release})")
384 elif os_system == "Windows":
385 os_name = "Windows"
386 os_version = "unknown"
387 try:
388 result = subprocess.run(
389 ["wmic", "os", "get", "Caption,Version", "/format:list"], capture_output=True, text=True, timeout=3
390 )
391 if result.returncode == 0:
392 windows_info = {}
393 for line in result.stdout.strip().split("\n"):
394 if "=" in line:
395 key, value = line.strip().split("=", 1)
396 windows_info[key] = value.strip()
398 if "Caption" in windows_info and "Version" in windows_info:
399 os_name = windows_info["Caption"]
400 os_version = windows_info["Version"]
401 except Exception:
402 pass
403 os_details.append(f"{os_name} {os_release} (version {os_version})")
405 elif os_system == "Darwin": # macOS
406 os_name = "macOS"
407 os_version = "unknown"
408 try:
409 result = subprocess.run(
410 ["sw_vers", "-productName", "-productVersion"], capture_output=True, text=True, timeout=3
411 )
412 if result.returncode == 0:
413 lines = result.stdout.strip().split("\n")
414 if len(lines) >= 2:
415 os_name = lines[0].strip()
416 os_version = lines[1].strip()
417 except Exception:
418 pass
419 os_details.append(f"{os_name} {os_release} (version {os_version})")
420 else:
421 os_details.append(f"{os_system} {os_release}")
423 os_details.append(f"({os_machine})")
424 os_info = " ".join(os_details)
425 logger.debug(f"Operating System: {os_info}")
426 # endregion
428 # region CPU information
429 cpu_info = platform.processor()
430 if not cpu_info or cpu_info == "":
431 cpu_info = platform.machine()
432 cpu_count = psutil.cpu_count(logical=False)
433 cpu_count_logical = psutil.cpu_count(logical=True)
434 logger.debug(f"CPU: {cpu_info} ({cpu_count} physical cores, {cpu_count_logical} logical cores)")
435 # endregion
437 # memory information
438 log_ram_info(logger)
440 # region GPU information
441 gpu_info = []
442 try:
443 # Check for NVIDIA GPU (works on Linux, Windows, macOS)
444 nvidia_smi_path = shutil.which("nvidia-smi")
445 if nvidia_smi_path:
446 try:
447 result = subprocess.run(
448 [nvidia_smi_path, "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"],
449 capture_output=True,
450 text=True,
451 timeout=3,
452 )
453 if result.returncode == 0:
454 for line in result.stdout.strip().split("\n"):
455 if line.strip():
456 parts = line.split(", ")
457 if len(parts) >= 2:
458 gpu_name = parts[0].strip()
459 gpu_memory = parts[1].strip()
460 gpu_info.append(f"{gpu_name} ({gpu_memory}MB)")
461 except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
462 pass
464 # Check for AMD GPU (rocm-smi on Linux, wmic on Windows)
465 if not gpu_info: # Only check AMD if no NVIDIA GPU found
466 if platform.system() == "Windows":
467 # Use wmic on Windows to detect AMD GPU
468 try:
469 result = subprocess.run(
470 ["wmic", "path", "win32_VideoController", "get", "name"],
471 capture_output=True,
472 text=True,
473 timeout=3,
474 )
475 if result.returncode == 0:
476 for line in result.stdout.strip().split("\n"):
477 line = line.strip()
478 if line and line != "Name" and "AMD" in line.upper():
479 gpu_info.append(line)
480 except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
481 pass
482 else:
483 # Use rocm-smi on Linux/macOS
484 rocm_smi_path = shutil.which("rocm-smi")
485 if rocm_smi_path:
486 try:
487 result = subprocess.run(
488 [rocm_smi_path, "--showproductname"], capture_output=True, text=True, timeout=3
489 )
490 if result.returncode == 0:
491 for line in result.stdout.strip().split("\n"):
492 if "Product Name" in line:
493 gpu_name = line.split(":")[-1].strip()
494 gpu_info.append(gpu_name)
495 except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
496 pass
498 # Fallback: Try to detect any GPU using platform-specific methods
499 if not gpu_info:
500 if platform.system() == "Windows":
501 try:
502 # Use wmic to get all video controllers
503 result = subprocess.run(
504 ["wmic", "path", "win32_VideoController", "get", "name"],
505 capture_output=True,
506 text=True,
507 timeout=3,
508 )
509 if result.returncode == 0:
510 for line in result.stdout.strip().split("\n"):
511 line = line.strip()
512 if (
513 line
514 and line != "Name"
515 and any(
516 keyword in line.upper()
517 for keyword in ["NVIDIA", "AMD", "INTEL", "RADEON", "GEFORCE"]
518 )
519 ):
520 gpu_info.append(line)
521 except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
522 pass
523 elif platform.system() == "Darwin": # macOS
524 try:
525 # Use system_profiler on macOS
526 result = subprocess.run(
527 ["system_profiler", "SPDisplaysDataType"], capture_output=True, text=True, timeout=3
528 )
529 if result.returncode == 0:
530 for line in result.stdout.strip().split("\n"):
531 if "Chipset Model:" in line:
532 gpu_name = line.split(":")[-1].strip()
533 gpu_info.append(gpu_name)
534 except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
535 pass
537 except Exception:
538 pass
540 if gpu_info:
541 logger.debug(f"GPU: {', '.join(gpu_info)}")
542 else:
543 logger.debug("GPU: Not detected or not supported")
544 # endregion
546 except Exception as e:
547 logger.debug(f"Failed to get system information: {e}")
550def resources_log_thread(stop_event: threading.Event, interval: int = 60):
551 """Log resources information to the logger
553 Args:
554 stop_event (Event): Event to stop the thread
555 interval (int): Interval in seconds to log resources information
557 Returns:
558 None
560 Note:
561 Output shows:
562 - RAM: total, available, used memory in GB and memory usage percentage
563 - Consumed RAM: sum of rss, and percentage of total memory used
564 - CPU usage: average CPU usage for last period
565 - Active queries: number of active SQL queries
566 """
567 from mindsdb.utilities.fs import get_tmp_dir
569 logger = getLogger(__name__)
570 while stop_event.wait(timeout=interval) is False:
571 try:
572 import psutil
574 main_process = psutil.Process(os.getpid())
575 children = main_process.children(recursive=True)
577 total_memory_info = {
578 "main_process": {
579 "pid": main_process.pid,
580 "name": main_process.name(),
581 "memory_info": main_process.memory_info(),
582 "memory_percent": main_process.memory_percent(),
583 },
584 "children": [],
585 "total_memory": {"rss": 0, "vms": 0, "percent": 0},
586 }
588 for child in children:
589 try:
590 child_info = {
591 "pid": child.pid,
592 "name": child.name(),
593 "memory_info": child.memory_info(),
594 "memory_percent": child.memory_percent(),
595 }
596 total_memory_info["children"].append(child_info)
598 total_memory_info["total_memory"]["rss"] += child.memory_info().rss
599 total_memory_info["total_memory"]["vms"] += child.memory_info().vms
600 total_memory_info["total_memory"]["percent"] += child.memory_percent()
601 except (psutil.NoSuchProcess, psutil.AccessDenied):
602 continue
604 total_memory_info["total_memory"]["rss"] += main_process.memory_info().rss
605 total_memory_info["total_memory"]["vms"] += main_process.memory_info().vms
606 total_memory_info["total_memory"]["percent"] += main_process.memory_percent()
608 memory = psutil.virtual_memory()
609 total_memory_gb = memory.total / (1024**3)
610 available_memory_gb = memory.available / (1024**3)
611 used_memory_gb = memory.used / (1024**3)
612 memory_percent = memory.percent
613 cpu_usage = psutil.cpu_percent()
615 active_http_queries = 0
616 p = get_tmp_dir().joinpath("processes/http_query/")
617 if p.exists() and p.is_dir():
618 for _ in p.iterdir():
619 active_http_queries += 1
621 active_mysql_queries = 0
622 p = get_tmp_dir().joinpath("processes/mysql_query/")
623 if p.exists() and p.is_dir():
624 for _ in p.iterdir():
625 active_mysql_queries += 1
627 level = app_config["logging"]["resources_log"]["level"]
628 logger.log(
629 logging.getLevelName(level),
630 f"RAM: {total_memory_gb:.1f}GB total, {available_memory_gb:.1f}GB available, {used_memory_gb:.1f}GB used ({memory_percent:.1f}%)\n"
631 f"Consumed RAM: {total_memory_info['total_memory']['rss'] / (1024**2):.1f}Mb, {total_memory_info['total_memory']['percent']:.2f}%\n"
632 f"CPU usage: {cpu_usage}% {interval}s\n"
633 f"Active queries: {active_http_queries}/HTTP {active_mysql_queries}/MySQL",
634 )
635 except Exception as e:
636 logger.debug(f"Failed to get memory information: {e}")