Coverage for mindsdb / utilities / fs.py: 57%
151 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import json
3import time
4import tempfile
5import threading
6from pathlib import Path
7from typing import Generator
9import psutil
11from mindsdb.utilities import log
13logger = log.getLogger(__name__)
16def get_tmp_dir() -> Path:
17 return Path(tempfile.gettempdir()).joinpath("mindsdb")
20def _get_process_mark_id(unified: bool = False) -> str:
21 """Creates a text that can be used to identify process+thread
22 Args:
23 unified: bool, if True then result will be same for same process+thread
24 Returns:
25 mark of process+thread
26 """
27 mark = f"{os.getpid()}-{threading.get_native_id()}"
28 if unified is True: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true
29 return mark
30 return f"{mark}-{str(time.time()).replace('.', '')}"
33def create_process_mark(folder="learn"):
34 p = get_tmp_dir().joinpath(f"processes/{folder}/")
35 p.mkdir(parents=True, exist_ok=True)
36 mark = _get_process_mark_id()
37 p.joinpath(mark).touch()
38 return mark
41def set_process_mark(folder: str, mark: str) -> None:
42 """touch new file which will be process mark
44 Args:
45 folder (str): where create the file
46 mark (str): file name
48 Returns:
49 str: process mark
50 """
51 p = get_tmp_dir().joinpath(f"processes/{folder}/")
52 p.mkdir(parents=True, exist_ok=True)
53 mark = f"{os.getpid()}-{threading.get_native_id()}-{mark}"
54 p.joinpath(mark).touch()
55 return mark
58def delete_process_mark(folder: str = "learn", mark: str | None = None):
59 if mark is None: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 mark = _get_process_mark_id()
61 p = get_tmp_dir().joinpath(f"processes/{folder}/").joinpath(mark)
62 p.unlink(missing_ok=True)
65def clean_process_marks():
66 """delete all existing processes marks"""
67 logger.debug("Deleting PIDs..")
68 p = get_tmp_dir().joinpath("processes/")
69 if p.exists() is False:
70 return
71 for path in p.iterdir():
72 if path.is_dir() is False:
73 return
74 for file in path.iterdir():
75 file.unlink(missing_ok=True)
78def get_processes_dir_files_generator() -> Generator[tuple[Path, int, int], None, None]:
79 """Get files from processes dir
81 Yields:
82 tuple(Path, int, int): file object, process id and thread id
83 """
84 p = get_tmp_dir().joinpath("processes/")
85 if p.exists() is False:
86 return
87 for path in p.iterdir():
88 if path.is_dir() is False:
89 continue
90 for file in path.iterdir():
91 parts = file.name.split("-")
92 process_id = int(parts[0])
93 thread_id = int(parts[1])
94 yield file, process_id, thread_id
97def clean_unlinked_process_marks() -> list[int]:
98 """delete marks that does not have corresponded processes/threads
100 Returns:
101 list[int]: list with ids of unexisting processes
102 """
103 deleted_pids = []
105 for file, process_id, thread_id in get_processes_dir_files_generator():
106 try:
107 process = psutil.Process(process_id)
108 if process.status() in (psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD):
109 raise psutil.NoSuchProcess(process_id)
111 threads = process.threads()
112 try:
113 next(t for t in threads if t.id == thread_id)
114 except StopIteration:
115 logger.warning(f"We have mark for process/thread {process_id}/{thread_id} but it does not exists")
116 deleted_pids.append(process_id)
117 file.unlink(missing_ok=True)
119 except psutil.AccessDenied:
120 logger.warning(f"access to {process_id} denied")
121 continue
123 except psutil.NoSuchProcess:
124 logger.warning(f"We have mark for process/thread {process_id}/{thread_id} but it does not exists")
125 deleted_pids.append(process_id)
126 file.unlink(missing_ok=True)
127 return deleted_pids
130def create_pid_file(config):
131 """
132 Create mindsdb process pid file. Check if previous process exists and is running
133 If pid_file_content is provided, it will be used to create the pid file with the content as key-value pairs.
134 If pid_file_content is not provided, the pid file will be created with the pid number only.
135 """
137 if os.environ.get("USE_PIDFILE") != "1":
138 return
140 p = get_tmp_dir()
141 p.mkdir(parents=True, exist_ok=True)
142 pid_file = p.joinpath("pid")
143 if pid_file.exists():
144 # if process exists raise exception
145 pid_file_data_str = pid_file.read_text().strip()
146 pid = None
147 try:
148 pid_file_data = json.loads(pid_file_data_str)
149 if isinstance(pid_file_data, dict):
150 pid = pid_file_data.get("pid")
151 else:
152 pid = pid_file_data
153 except json.JSONDecodeError:
154 # is it just pid number (old approach)?
155 try:
156 pid = int(pid_file_data_str)
157 except Exception:
158 pass
159 logger.warning(f"Found existing PID file {pid_file} but it is not a valid JSON, removing")
161 if pid is not None:
162 try:
163 psutil.Process(int(pid))
164 raise Exception(f"Found PID file with existing process: {pid} {pid_file}")
165 except (psutil.Error, ValueError):
166 pass
167 logger.warning(f"Found existing PID file {pid_file}({pid}), removing")
169 pid_file.unlink(missing_ok=True)
171 pid_file_content = config["pid_file_content"]
172 if pid_file_content is None or len(pid_file_content) == 0:
173 pid_file_data_str = str(os.getpid())
174 else:
175 pid_file_data = {"pid": os.getpid()}
176 for key, value in pid_file_content.items():
177 value_path = value.split(".")
178 value_obj = config
179 for path_part in value_path:
180 value_obj = value_obj.get(path_part) if value_obj else None
181 pid_file_data[key] = value_obj
183 pid_file_data_str = json.dumps(pid_file_data)
184 pid_file.write_text(pid_file_data_str)
187def delete_pid_file():
188 """
189 Remove existing process pid file if it matches current process
190 """
192 if os.environ.get("USE_PIDFILE") != "1":
193 return
195 pid_file = get_tmp_dir().joinpath("pid")
197 if not pid_file.exists():
198 return
200 pid_file_data_str = pid_file.read_text().strip()
201 pid = None
202 try:
203 pid_file_data = json.loads(pid_file_data_str)
204 if isinstance(pid_file_data, dict):
205 pid = pid_file_data.get("pid")
206 else:
207 # It's a simple number (old format or pid_file_content=None format)
208 pid = pid_file_data
209 except json.JSONDecodeError:
210 logger.warning(f"Found existing PID file {pid_file} but it is not a valid JSON")
212 if pid is not None and str(pid) != str(os.getpid()):
213 logger.warning(f"Process id in PID file ({pid_file}) doesn't match mindsdb pid")
214 return
216 pid_file.unlink(missing_ok=True)
219def __is_within_directory(directory, target):
220 abs_directory = os.path.abspath(directory)
221 abs_target = os.path.abspath(target)
222 prefix = os.path.commonprefix([abs_directory, abs_target])
223 return prefix == abs_directory
226def safe_extract(tarfile, path=".", members=None, *, numeric_owner=False):
227 # for py >= 3.12
228 if hasattr(tarfile, "data_filter"):
229 tarfile.extractall(path, members=members, numeric_owner=numeric_owner, filter="data")
230 return
232 # for py < 3.12
233 for member in tarfile.getmembers():
234 member_path = os.path.join(path, member.name)
235 if not __is_within_directory(path, member_path):
236 raise Exception("Attempted Path Traversal in Tar File")
237 tarfile.extractall(path, members=members, numeric_owner=numeric_owner)