Coverage for mindsdb / interfaces / storage / fs.py: 42%
353 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import io
3import shutil
4import filecmp
5import tarfile
6import hashlib
7from pathlib import Path
8from abc import ABC, abstractmethod
9from typing import Union, Optional
10from dataclasses import dataclass
11from datetime import datetime
12import threading
14if os.name == "posix": 14 ↛ 17line 14 didn't jump to line 17 because the condition on line 14 was always true
15 import fcntl
17import psutil
19from mindsdb.utilities.config import Config
21if Config()["permanent_storage"]["location"] == "s3": 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 import boto3
23 from botocore.exceptions import ClientError as S3ClientError
24else:
25 S3ClientError = FileNotFoundError
27from mindsdb.utilities.context import context as ctx
28import mindsdb.utilities.profiler as profiler
29from mindsdb.utilities import log
30from mindsdb.utilities.fs import safe_extract
32logger = log.getLogger(__name__)
35@dataclass(frozen=True)
36class RESOURCE_GROUP:
37 PREDICTOR = "predictor"
38 INTEGRATION = "integration"
39 TAB = "tab"
40 SYSTEM = "system"
43RESOURCE_GROUP = RESOURCE_GROUP()
46DIR_LOCK_FILE_NAME = "dir.lock"
47DIR_LAST_MODIFIED_FILE_NAME = "last_modified.txt"
48SERVICE_FILES_NAMES = (DIR_LOCK_FILE_NAME, DIR_LAST_MODIFIED_FILE_NAME)
51def compare_recursive(comparison: filecmp.dircmp) -> bool:
52 """Check output of dircmp and return True if the directories do not differ
54 Args:
55 comparison (filecmp.dircmp): dirs comparison
57 Returns:
58 bool: True if dirs do not differ
59 """
60 if comparison.left_only or comparison.right_only or comparison.diff_files:
61 return False
62 for sub_comparison in comparison.subdirs.values():
63 if compare_recursive(sub_comparison) is False:
64 return False
65 return True
68def compare_directories(dir1: str, dir2: str) -> bool:
69 """Compare two directories
71 Args:
72 dir1 (str): dir to compare
73 dir2 (str): dir to compare
75 Returns:
76 bool: True if dirs do not differ
77 """
78 dcmp = filecmp.dircmp(dir1, dir2)
79 return compare_recursive(dcmp)
82def copy(src, dst):
83 if os.path.isdir(src):
84 if os.path.exists(dst):
85 if compare_directories(src, dst):
86 return
87 shutil.rmtree(dst, ignore_errors=True)
88 shutil.copytree(src, dst, dirs_exist_ok=True)
89 else:
90 if os.path.exists(dst):
91 if hashlib.md5(open(src, "rb").read()).hexdigest() == hashlib.md5(open(dst, "rb").read()).hexdigest():
92 return
93 try:
94 os.remove(dst)
95 except Exception:
96 pass
97 shutil.copy2(src, dst)
100class BaseFSStore(ABC):
101 """Base class for file storage"""
103 def __init__(self):
104 self.config = Config()
105 self.storage = self.config["paths"]["storage"]
107 @abstractmethod
108 def get(self, local_name, base_dir):
109 """Copy file/folder from storage to {base_dir}
111 Args:
112 local_name (str): name of resource (file/folder)
113 base_dir (str): path to copy the resource
114 """
115 pass
117 @abstractmethod
118 def put(self, local_name, base_dir):
119 """Copy file/folder from {base_dir} to storage
121 Args:
122 local_name (str): name of resource (file/folder)
123 base_dir (str): path to folder with the resource
124 """
125 pass
127 @abstractmethod
128 def delete(self, remote_name):
129 """Delete file/folder from storage
131 Args:
132 remote_name (str): name of resource
133 """
134 pass
137def get_dir_size(path: str):
138 total = 0
139 with os.scandir(path) as it:
140 for entry in it:
141 if entry.is_file():
142 total += entry.stat().st_size
143 elif entry.is_dir():
144 total += get_dir_size(entry.path)
145 return total
148class AbsentFSStore(BaseFSStore):
149 """Storage class that does not store anything. It is just a dummy."""
151 def get(self, *args, **kwargs):
152 pass
154 def put(self, *args, **kwargs):
155 pass
157 def delete(self, *args, **kwargs):
158 pass
161class LocalFSStore(BaseFSStore):
162 """Storage that stores files locally"""
164 def __init__(self):
165 super().__init__()
167 def get(self, local_name, base_dir):
168 remote_name = local_name
169 src = os.path.join(self.storage, remote_name)
170 dest = os.path.join(base_dir, local_name)
171 if not os.path.exists(dest) or get_dir_size(src) != get_dir_size(dest):
172 copy(src, dest)
174 def put(self, local_name, base_dir, compression_level=9):
175 remote_name = local_name
176 copy(os.path.join(base_dir, local_name), os.path.join(self.storage, remote_name))
178 def delete(self, remote_name):
179 path = Path(self.storage).joinpath(remote_name)
180 try:
181 if path.is_file():
182 path.unlink(missing_ok=True)
183 else:
184 shutil.rmtree(path)
185 except FileNotFoundError:
186 pass
189class FileLock:
190 """file lock to make safe concurrent access to directory
191 works as context
192 """
194 @staticmethod
195 def lock_folder_path(relative_path: Path) -> Path:
196 """Args:
197 relative_path (Path): path to resource directory relative to storage root
199 Returns:
200 Path: abs path to folder with lock file
201 """
202 config = Config()
203 root_storage_path = Path(config.paths["root"])
204 return config.paths["locks"] / relative_path.relative_to(root_storage_path)
206 def __init__(self, relative_path: Path, mode: str = "w"):
207 """Args:
208 relative_path (Path): path to resource directory relative to storage root
209 mode (str): lock for read (r) or write (w)
210 """
211 if os.name != "posix": 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 return
214 self._local_path = FileLock.lock_folder_path(relative_path)
215 self._lock_file_name = DIR_LOCK_FILE_NAME
216 self._lock_file_path = self._local_path / self._lock_file_name
217 self._mode = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
219 if self._lock_file_path.is_file() is False:
220 self._local_path.mkdir(parents=True, exist_ok=True)
221 try:
222 self._lock_file_path.write_text("")
223 except Exception:
224 pass
226 def __enter__(self):
227 if os.name != "posix": 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 return
230 try:
231 # On at least some systems, LOCK_EX can only be used if the file
232 # descriptor refers to a file opened for writing.
233 self._lock_fd = os.open(self._lock_file_path, os.O_RDWR | os.O_CREAT)
234 fcntl.lockf(self._lock_fd, self._mode | fcntl.LOCK_NB)
235 except (ValueError, FileNotFoundError):
236 # file probably was deleted between open and lock
237 logger.error(f"Cant accure lock on {self._local_path}")
238 raise FileNotFoundError
239 except BlockingIOError:
240 logger.error(f"Directory is locked by another process: {self._local_path}")
241 fcntl.lockf(self._lock_fd, self._mode)
243 def __exit__(self, exc_type, exc_value, traceback):
244 if os.name != "posix": 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true
245 return
247 try:
248 fcntl.lockf(self._lock_fd, fcntl.LOCK_UN)
249 os.close(self._lock_fd)
250 except Exception:
251 pass
254class S3FSStore(BaseFSStore):
255 """Storage that stores files in amazon s3"""
257 dt_format = "%d.%m.%y %H:%M:%S.%f"
259 def __init__(self):
260 super().__init__()
261 if "s3_credentials" in self.config["permanent_storage"]:
262 self.s3 = boto3.client("s3", **self.config["permanent_storage"]["s3_credentials"])
263 else:
264 self.s3 = boto3.client("s3")
265 self.bucket = self.config["permanent_storage"]["bucket"]
266 self._thread_lock = threading.Lock()
268 def _get_remote_last_modified(self, object_name: str) -> datetime:
269 """get time when object was created/modified
271 Args:
272 object_name (str): name if file in bucket
274 Returns:
275 datetime
276 """
277 last_modified = self.s3.get_object_attributes(
278 Bucket=self.bucket, Key=object_name, ObjectAttributes=["Checksum"]
279 )["LastModified"]
280 last_modified = last_modified.replace(tzinfo=None)
281 return last_modified
283 @profiler.profile()
284 def _get_local_last_modified(self, base_dir: str, local_name: str) -> datetime:
285 """get 'last_modified' that saved locally
287 Args:
288 base_dir (str): path to base folder
289 local_name (str): folder name
291 Returns:
292 datetime | None
293 """
294 last_modified_file_path = Path(base_dir) / local_name / DIR_LAST_MODIFIED_FILE_NAME
295 if last_modified_file_path.is_file() is False:
296 return None
297 try:
298 last_modified_text = last_modified_file_path.read_text()
299 last_modified_datetime = datetime.strptime(last_modified_text, self.dt_format)
300 except Exception:
301 return None
302 return last_modified_datetime
304 @profiler.profile()
305 def _save_local_last_modified(self, base_dir: str, local_name: str, last_modified: datetime):
306 """Save 'last_modified' to local folder
308 Args:
309 base_dir (str): path to base folder
310 local_name (str): folder name
311 last_modified (datetime)
312 """
313 last_modified_file_path = Path(base_dir) / local_name / DIR_LAST_MODIFIED_FILE_NAME
314 last_modified_text = last_modified.strftime(self.dt_format)
315 last_modified_file_path.write_text(last_modified_text)
317 @profiler.profile()
318 def _download(self, base_dir: str, remote_ziped_name: str, local_ziped_path: str, last_modified: datetime = None):
319 """download file to s3 and unarchive it
321 Args:
322 base_dir (str)
323 remote_ziped_name (str)
324 local_ziped_path (str)
325 last_modified (datetime, optional)
326 """
327 os.makedirs(base_dir, exist_ok=True)
329 remote_size = self.s3.get_object_attributes(
330 Bucket=self.bucket, Key=remote_ziped_name, ObjectAttributes=["ObjectSize"]
331 )["ObjectSize"]
332 if (remote_size * 2) > psutil.virtual_memory().available:
333 fh = io.BytesIO()
334 self.s3.download_fileobj(self.bucket, remote_ziped_name, fh)
335 with tarfile.open(fileobj=fh) as tar:
336 safe_extract(tar, path=base_dir)
337 else:
338 self.s3.download_file(self.bucket, remote_ziped_name, local_ziped_path)
339 shutil.unpack_archive(local_ziped_path, base_dir)
340 os.remove(local_ziped_path)
342 # os.system(f'chmod -R 777 {base_dir}')
344 if last_modified is None:
345 last_modified = self._get_remote_last_modified(remote_ziped_name)
346 self._save_local_last_modified(base_dir, remote_ziped_name.replace(".tar.gz", ""), last_modified)
348 @profiler.profile()
349 def get(self, local_name, base_dir):
350 remote_name = local_name
351 remote_ziped_name = f"{remote_name}.tar.gz"
352 local_ziped_name = f"{local_name}.tar.gz"
353 local_ziped_path = os.path.join(base_dir, local_ziped_name)
355 folder_path = Path(base_dir) / local_name
356 with FileLock(folder_path, mode="r"):
357 local_last_modified = self._get_local_last_modified(base_dir, local_name)
358 remote_last_modified = self._get_remote_last_modified(remote_ziped_name)
359 if local_last_modified is not None and local_last_modified == remote_last_modified:
360 return
362 with FileLock(folder_path, mode="w"):
363 self._download(base_dir, remote_ziped_name, local_ziped_path, last_modified=remote_last_modified)
365 @profiler.profile()
366 def put(self, local_name, base_dir, compression_level=9):
367 # NOTE: This `make_archive` function is implemente poorly and will create an empty archive file even if
368 # the file/dir to be archived doesn't exist or for some other reason can't be archived
369 remote_name = local_name
370 remote_zipped_name = f"{remote_name}.tar.gz"
372 dir_path = Path(base_dir) / remote_name
373 dir_size = sum(f.stat().st_size for f in dir_path.glob("**/*") if f.is_file())
374 if (dir_size * 2) < psutil.virtual_memory().available:
375 old_cwd = os.getcwd()
376 fh = io.BytesIO()
377 with self._thread_lock:
378 os.chdir(base_dir)
379 with tarfile.open(fileobj=fh, mode="w:gz", compresslevel=compression_level) as tar:
380 for path in dir_path.iterdir():
381 if path.is_file() and path.name in SERVICE_FILES_NAMES:
382 continue
383 tar.add(path.relative_to(base_dir))
384 os.chdir(old_cwd)
385 fh.seek(0)
387 self.s3.upload_fileobj(fh, self.bucket, remote_zipped_name)
388 else:
389 shutil.make_archive(os.path.join(base_dir, remote_name), "gztar", root_dir=base_dir, base_dir=local_name)
391 self.s3.upload_file(os.path.join(base_dir, remote_zipped_name), self.bucket, remote_zipped_name)
392 os.remove(os.path.join(base_dir, remote_zipped_name))
394 last_modified = self._get_remote_last_modified(remote_zipped_name)
395 self._save_local_last_modified(base_dir, local_name, last_modified)
397 @profiler.profile()
398 def delete(self, remote_name):
399 self.s3.delete_object(Bucket=self.bucket, Key=remote_name)
402def FsStore():
403 storage_location = Config()["permanent_storage"]["location"]
404 if storage_location == "absent": 404 ↛ 406line 404 didn't jump to line 406 because the condition on line 404 was always true
405 return AbsentFSStore()
406 if storage_location == "local":
407 return LocalFSStore()
408 if storage_location == "s3":
409 return S3FSStore()
410 raise Exception(f"Location: '{storage_location}' not supported")
413class FileStorage:
414 def __init__(self, resource_group: str, resource_id: int, root_dir: str = "content", sync: bool = True):
415 """
416 Args:
417 resource_group (str)
418 resource_id (int)
419 root_dir (str)
420 sync (bool)
421 """
423 self.resource_group = resource_group
424 self.resource_id = resource_id
425 self.root_dir = root_dir
426 self.sync = sync
428 self.folder_name = f"{resource_group}_{ctx.company_id}_{resource_id}"
430 config = Config()
431 self.fs_store = FsStore()
432 self.content_path = Path(config["paths"][root_dir])
433 self.resource_group_path = self.content_path / resource_group
434 self.folder_path = self.resource_group_path / self.folder_name
435 if self.folder_path.exists() is False:
436 self.folder_path.mkdir(parents=True, exist_ok=True)
438 @profiler.profile()
439 def push(self, compression_level: int = 9):
440 with FileLock(self.folder_path, mode="r"):
441 self._push_no_lock(compression_level=compression_level)
443 @profiler.profile()
444 def _push_no_lock(self, compression_level: int = 9):
445 self.fs_store.put(str(self.folder_name), str(self.resource_group_path), compression_level=compression_level)
447 @profiler.profile()
448 def push_path(self, path, compression_level: int = 9):
449 # TODO implement push per element
450 self.push(compression_level=compression_level)
452 @profiler.profile()
453 def pull(self):
454 try:
455 self.fs_store.get(str(self.folder_name), str(self.resource_group_path))
456 except (FileNotFoundError, S3ClientError):
457 pass
459 @profiler.profile()
460 def pull_path(self, path):
461 # TODO implement pull per element
462 self.pull()
464 @profiler.profile()
465 def file_set(self, name, content):
466 if self.sync is True:
467 self.pull()
469 with FileLock(self.folder_path, mode="w"):
470 dest_abs_path = self.folder_path / name
472 with open(dest_abs_path, "wb") as fd:
473 fd.write(content)
475 if self.sync is True:
476 self._push_no_lock()
478 @profiler.profile()
479 def file_get(self, name):
480 if self.sync is True:
481 self.pull()
482 dest_abs_path = self.folder_path / name
483 with FileLock(self.folder_path, mode="r"):
484 with open(dest_abs_path, "rb") as fd:
485 return fd.read()
487 @profiler.profile()
488 def add(self, path: Union[str, Path], dest_rel_path: Optional[Union[str, Path]] = None):
489 """Copy file/folder to persist storage
491 Examples:
492 Copy file 'args.json' to '{storage}/args.json'
493 >>> fs.add('/path/args.json')
495 Copy file 'args.json' to '{storage}/folder/opts.json'
496 >>> fs.add('/path/args.json', 'folder/opts.json')
498 Copy folder 'folder' to '{storage}/folder'
499 >>> fs.add('/path/folder')
501 Copy folder 'folder' to '{storage}/path/folder'
502 >>> fs.add('/path/folder', 'path/folder')
504 Args:
505 path (Union[str, Path]): path to the resource
506 dest_rel_path (Optional[Union[str, Path]]): relative path in storage to file or folder
507 """
508 if self.sync is True:
509 self.pull()
510 with FileLock(self.folder_path, mode="w"):
511 path = Path(path)
512 if isinstance(dest_rel_path, str):
513 dest_rel_path = Path(dest_rel_path)
515 if dest_rel_path is None:
516 dest_abs_path = self.folder_path / path.name
517 else:
518 dest_abs_path = self.folder_path / dest_rel_path
520 copy(str(path), str(dest_abs_path))
522 if self.sync is True:
523 self._push_no_lock()
525 @profiler.profile()
526 def get_path(self, relative_path: Union[str, Path]) -> Path:
527 """Return path to file or folder
529 Examples:
530 get path to 'opts.json':
531 >>> fs.get_path('folder/opts.json')
532 ... /path/{storage}/folder/opts.json
534 Args:
535 relative_path (Union[str, Path]): Path relative to the storage folder
537 Returns:
538 Path: path to requested file or folder
539 """
540 if self.sync is True: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 self.pull()
543 with FileLock(self.folder_path, mode="r"):
544 if isinstance(relative_path, str): 544 ↛ 548line 544 didn't jump to line 548 because the condition on line 544 was always true
545 relative_path = Path(relative_path)
546 # relative_path = relative_path.resolve()
548 if relative_path.is_absolute(): 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true
549 raise TypeError("FSStorage.get_path() got absolute path as argument")
551 ret_path = self.folder_path / relative_path
552 if not ret_path.exists(): 552 ↛ 556line 552 didn't jump to line 556
553 # raise Exception('Path does not exists')
554 os.makedirs(ret_path)
556 return ret_path
558 def delete(self, relative_path: Union[str, Path] = "."):
559 path = (self.folder_path / relative_path).resolve()
560 if isinstance(relative_path, str): 560 ↛ 563line 560 didn't jump to line 563 because the condition on line 560 was always true
561 relative_path = Path(relative_path)
563 if relative_path.is_absolute(): 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true
564 raise TypeError("FSStorage.delete() got absolute path as argument")
566 # complete removal
567 if path == self.folder_path.resolve(): 567 ↛ 582line 567 didn't jump to line 582 because the condition on line 567 was always true
568 with FileLock(self.folder_path, mode="w"):
569 self.fs_store.delete(self.folder_name)
570 # NOTE on some fs .rmtree is not working if any file is open
571 shutil.rmtree(str(self.folder_path))
573 # region del file lock
574 lock_folder_path = FileLock.lock_folder_path(self.folder_path)
575 try:
576 shutil.rmtree(lock_folder_path)
577 except FileNotFoundError:
578 logger.warning("Tried to delete file not found: %s", lock_folder_path)
579 # endregion
580 return
582 if self.sync is True:
583 self.pull()
585 with FileLock(self.folder_path, mode="w"):
586 if path.exists() is False:
587 raise Exception("Path does not exists")
589 if path.is_file():
590 path.unlink(missing_ok=True)
591 else:
592 path.rmdir()
594 if self.sync is True:
595 self._push_no_lock()
598class FileStorageFactory:
599 def __init__(self, resource_group: str, root_dir: str = "content", sync: bool = True):
600 self.resource_group = resource_group
601 self.root_dir = root_dir
602 self.sync = sync
604 def __call__(self, resource_id: int):
605 return FileStorage(
606 resource_group=self.resource_group, root_dir=self.root_dir, sync=self.sync, resource_id=resource_id
607 )