Coverage for mindsdb / utilities / fs.py: 57%

151 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import json 

3import time 

4import tempfile 

5import threading 

6from pathlib import Path 

7from typing import Generator 

8 

9import psutil 

10 

11from mindsdb.utilities import log 

12 

13logger = log.getLogger(__name__) 

14 

15 

16def get_tmp_dir() -> Path: 

17 return Path(tempfile.gettempdir()).joinpath("mindsdb") 

18 

19 

20def _get_process_mark_id(unified: bool = False) -> str: 

21 """Creates a text that can be used to identify process+thread 

22 Args: 

23 unified: bool, if True then result will be same for same process+thread 

24 Returns: 

25 mark of process+thread 

26 """ 

27 mark = f"{os.getpid()}-{threading.get_native_id()}" 

28 if unified is True: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 return mark 

30 return f"{mark}-{str(time.time()).replace('.', '')}" 

31 

32 

33def create_process_mark(folder="learn"): 

34 p = get_tmp_dir().joinpath(f"processes/{folder}/") 

35 p.mkdir(parents=True, exist_ok=True) 

36 mark = _get_process_mark_id() 

37 p.joinpath(mark).touch() 

38 return mark 

39 

40 

41def set_process_mark(folder: str, mark: str) -> None: 

42 """touch new file which will be process mark 

43 

44 Args: 

45 folder (str): where create the file 

46 mark (str): file name 

47 

48 Returns: 

49 str: process mark 

50 """ 

51 p = get_tmp_dir().joinpath(f"processes/{folder}/") 

52 p.mkdir(parents=True, exist_ok=True) 

53 mark = f"{os.getpid()}-{threading.get_native_id()}-{mark}" 

54 p.joinpath(mark).touch() 

55 return mark 

56 

57 

58def delete_process_mark(folder: str = "learn", mark: str | None = None): 

59 if mark is None: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 mark = _get_process_mark_id() 

61 p = get_tmp_dir().joinpath(f"processes/{folder}/").joinpath(mark) 

62 p.unlink(missing_ok=True) 

63 

64 

65def clean_process_marks(): 

66 """delete all existing processes marks""" 

67 logger.debug("Deleting PIDs..") 

68 p = get_tmp_dir().joinpath("processes/") 

69 if p.exists() is False: 

70 return 

71 for path in p.iterdir(): 

72 if path.is_dir() is False: 

73 return 

74 for file in path.iterdir(): 

75 file.unlink(missing_ok=True) 

76 

77 

78def get_processes_dir_files_generator() -> Generator[tuple[Path, int, int], None, None]: 

79 """Get files from processes dir 

80 

81 Yields: 

82 tuple(Path, int, int): file object, process id and thread id 

83 """ 

84 p = get_tmp_dir().joinpath("processes/") 

85 if p.exists() is False: 

86 return 

87 for path in p.iterdir(): 

88 if path.is_dir() is False: 

89 continue 

90 for file in path.iterdir(): 

91 parts = file.name.split("-") 

92 process_id = int(parts[0]) 

93 thread_id = int(parts[1]) 

94 yield file, process_id, thread_id 

95 

96 

97def clean_unlinked_process_marks() -> list[int]: 

98 """delete marks that does not have corresponded processes/threads 

99 

100 Returns: 

101 list[int]: list with ids of unexisting processes 

102 """ 

103 deleted_pids = [] 

104 

105 for file, process_id, thread_id in get_processes_dir_files_generator(): 

106 try: 

107 process = psutil.Process(process_id) 

108 if process.status() in (psutil.STATUS_ZOMBIE, psutil.STATUS_DEAD): 

109 raise psutil.NoSuchProcess(process_id) 

110 

111 threads = process.threads() 

112 try: 

113 next(t for t in threads if t.id == thread_id) 

114 except StopIteration: 

115 logger.warning(f"We have mark for process/thread {process_id}/{thread_id} but it does not exists") 

116 deleted_pids.append(process_id) 

117 file.unlink(missing_ok=True) 

118 

119 except psutil.AccessDenied: 

120 logger.warning(f"access to {process_id} denied") 

121 continue 

122 

123 except psutil.NoSuchProcess: 

124 logger.warning(f"We have mark for process/thread {process_id}/{thread_id} but it does not exists") 

125 deleted_pids.append(process_id) 

126 file.unlink(missing_ok=True) 

127 return deleted_pids 

128 

129 

130def create_pid_file(config): 

131 """ 

132 Create mindsdb process pid file. Check if previous process exists and is running 

133 If pid_file_content is provided, it will be used to create the pid file with the content as key-value pairs. 

134 If pid_file_content is not provided, the pid file will be created with the pid number only. 

135 """ 

136 

137 if os.environ.get("USE_PIDFILE") != "1": 

138 return 

139 

140 p = get_tmp_dir() 

141 p.mkdir(parents=True, exist_ok=True) 

142 pid_file = p.joinpath("pid") 

143 if pid_file.exists(): 

144 # if process exists raise exception 

145 pid_file_data_str = pid_file.read_text().strip() 

146 pid = None 

147 try: 

148 pid_file_data = json.loads(pid_file_data_str) 

149 if isinstance(pid_file_data, dict): 

150 pid = pid_file_data.get("pid") 

151 else: 

152 pid = pid_file_data 

153 except json.JSONDecodeError: 

154 # is it just pid number (old approach)? 

155 try: 

156 pid = int(pid_file_data_str) 

157 except Exception: 

158 pass 

159 logger.warning(f"Found existing PID file {pid_file} but it is not a valid JSON, removing") 

160 

161 if pid is not None: 

162 try: 

163 psutil.Process(int(pid)) 

164 raise Exception(f"Found PID file with existing process: {pid} {pid_file}") 

165 except (psutil.Error, ValueError): 

166 pass 

167 logger.warning(f"Found existing PID file {pid_file}({pid}), removing") 

168 

169 pid_file.unlink(missing_ok=True) 

170 

171 pid_file_content = config["pid_file_content"] 

172 if pid_file_content is None or len(pid_file_content) == 0: 

173 pid_file_data_str = str(os.getpid()) 

174 else: 

175 pid_file_data = {"pid": os.getpid()} 

176 for key, value in pid_file_content.items(): 

177 value_path = value.split(".") 

178 value_obj = config 

179 for path_part in value_path: 

180 value_obj = value_obj.get(path_part) if value_obj else None 

181 pid_file_data[key] = value_obj 

182 

183 pid_file_data_str = json.dumps(pid_file_data) 

184 pid_file.write_text(pid_file_data_str) 

185 

186 

187def delete_pid_file(): 

188 """ 

189 Remove existing process pid file if it matches current process 

190 """ 

191 

192 if os.environ.get("USE_PIDFILE") != "1": 

193 return 

194 

195 pid_file = get_tmp_dir().joinpath("pid") 

196 

197 if not pid_file.exists(): 

198 return 

199 

200 pid_file_data_str = pid_file.read_text().strip() 

201 pid = None 

202 try: 

203 pid_file_data = json.loads(pid_file_data_str) 

204 if isinstance(pid_file_data, dict): 

205 pid = pid_file_data.get("pid") 

206 else: 

207 # It's a simple number (old format or pid_file_content=None format) 

208 pid = pid_file_data 

209 except json.JSONDecodeError: 

210 logger.warning(f"Found existing PID file {pid_file} but it is not a valid JSON") 

211 

212 if pid is not None and str(pid) != str(os.getpid()): 

213 logger.warning(f"Process id in PID file ({pid_file}) doesn't match mindsdb pid") 

214 return 

215 

216 pid_file.unlink(missing_ok=True) 

217 

218 

219def __is_within_directory(directory, target): 

220 abs_directory = os.path.abspath(directory) 

221 abs_target = os.path.abspath(target) 

222 prefix = os.path.commonprefix([abs_directory, abs_target]) 

223 return prefix == abs_directory 

224 

225 

226def safe_extract(tarfile, path=".", members=None, *, numeric_owner=False): 

227 # for py >= 3.12 

228 if hasattr(tarfile, "data_filter"): 

229 tarfile.extractall(path, members=members, numeric_owner=numeric_owner, filter="data") 

230 return 

231 

232 # for py < 3.12 

233 for member in tarfile.getmembers(): 

234 member_path = os.path.join(path, member.name) 

235 if not __is_within_directory(path, member_path): 

236 raise Exception("Attempted Path Traversal in Tar File") 

237 tarfile.extractall(path, members=members, numeric_owner=numeric_owner)