Coverage for mindsdb / interfaces / storage / fs.py: 42%

353 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import io 

3import shutil 

4import filecmp 

5import tarfile 

6import hashlib 

7from pathlib import Path 

8from abc import ABC, abstractmethod 

9from typing import Union, Optional 

10from dataclasses import dataclass 

11from datetime import datetime 

12import threading 

13 

14if os.name == "posix": 14 ↛ 17line 14 didn't jump to line 17 because the condition on line 14 was always true

15 import fcntl 

16 

17import psutil 

18 

19from mindsdb.utilities.config import Config 

20 

21if Config()["permanent_storage"]["location"] == "s3": 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 import boto3 

23 from botocore.exceptions import ClientError as S3ClientError 

24else: 

25 S3ClientError = FileNotFoundError 

26 

27from mindsdb.utilities.context import context as ctx 

28import mindsdb.utilities.profiler as profiler 

29from mindsdb.utilities import log 

30from mindsdb.utilities.fs import safe_extract 

31 

32logger = log.getLogger(__name__) 

33 

34 

35@dataclass(frozen=True) 

36class RESOURCE_GROUP: 

37 PREDICTOR = "predictor" 

38 INTEGRATION = "integration" 

39 TAB = "tab" 

40 SYSTEM = "system" 

41 

42 

43RESOURCE_GROUP = RESOURCE_GROUP() 

44 

45 

46DIR_LOCK_FILE_NAME = "dir.lock" 

47DIR_LAST_MODIFIED_FILE_NAME = "last_modified.txt" 

48SERVICE_FILES_NAMES = (DIR_LOCK_FILE_NAME, DIR_LAST_MODIFIED_FILE_NAME) 

49 

50 

51def compare_recursive(comparison: filecmp.dircmp) -> bool: 

52 """Check output of dircmp and return True if the directories do not differ 

53 

54 Args: 

55 comparison (filecmp.dircmp): dirs comparison 

56 

57 Returns: 

58 bool: True if dirs do not differ 

59 """ 

60 if comparison.left_only or comparison.right_only or comparison.diff_files: 

61 return False 

62 for sub_comparison in comparison.subdirs.values(): 

63 if compare_recursive(sub_comparison) is False: 

64 return False 

65 return True 

66 

67 

68def compare_directories(dir1: str, dir2: str) -> bool: 

69 """Compare two directories 

70 

71 Args: 

72 dir1 (str): dir to compare 

73 dir2 (str): dir to compare 

74 

75 Returns: 

76 bool: True if dirs do not differ 

77 """ 

78 dcmp = filecmp.dircmp(dir1, dir2) 

79 return compare_recursive(dcmp) 

80 

81 

82def copy(src, dst): 

83 if os.path.isdir(src): 

84 if os.path.exists(dst): 

85 if compare_directories(src, dst): 

86 return 

87 shutil.rmtree(dst, ignore_errors=True) 

88 shutil.copytree(src, dst, dirs_exist_ok=True) 

89 else: 

90 if os.path.exists(dst): 

91 if hashlib.md5(open(src, "rb").read()).hexdigest() == hashlib.md5(open(dst, "rb").read()).hexdigest(): 

92 return 

93 try: 

94 os.remove(dst) 

95 except Exception: 

96 pass 

97 shutil.copy2(src, dst) 

98 

99 

100class BaseFSStore(ABC): 

101 """Base class for file storage""" 

102 

103 def __init__(self): 

104 self.config = Config() 

105 self.storage = self.config["paths"]["storage"] 

106 

107 @abstractmethod 

108 def get(self, local_name, base_dir): 

109 """Copy file/folder from storage to {base_dir} 

110 

111 Args: 

112 local_name (str): name of resource (file/folder) 

113 base_dir (str): path to copy the resource 

114 """ 

115 pass 

116 

117 @abstractmethod 

118 def put(self, local_name, base_dir): 

119 """Copy file/folder from {base_dir} to storage 

120 

121 Args: 

122 local_name (str): name of resource (file/folder) 

123 base_dir (str): path to folder with the resource 

124 """ 

125 pass 

126 

127 @abstractmethod 

128 def delete(self, remote_name): 

129 """Delete file/folder from storage 

130 

131 Args: 

132 remote_name (str): name of resource 

133 """ 

134 pass 

135 

136 

137def get_dir_size(path: str): 

138 total = 0 

139 with os.scandir(path) as it: 

140 for entry in it: 

141 if entry.is_file(): 

142 total += entry.stat().st_size 

143 elif entry.is_dir(): 

144 total += get_dir_size(entry.path) 

145 return total 

146 

147 

148class AbsentFSStore(BaseFSStore): 

149 """Storage class that does not store anything. It is just a dummy.""" 

150 

151 def get(self, *args, **kwargs): 

152 pass 

153 

154 def put(self, *args, **kwargs): 

155 pass 

156 

157 def delete(self, *args, **kwargs): 

158 pass 

159 

160 

161class LocalFSStore(BaseFSStore): 

162 """Storage that stores files locally""" 

163 

164 def __init__(self): 

165 super().__init__() 

166 

167 def get(self, local_name, base_dir): 

168 remote_name = local_name 

169 src = os.path.join(self.storage, remote_name) 

170 dest = os.path.join(base_dir, local_name) 

171 if not os.path.exists(dest) or get_dir_size(src) != get_dir_size(dest): 

172 copy(src, dest) 

173 

174 def put(self, local_name, base_dir, compression_level=9): 

175 remote_name = local_name 

176 copy(os.path.join(base_dir, local_name), os.path.join(self.storage, remote_name)) 

177 

178 def delete(self, remote_name): 

179 path = Path(self.storage).joinpath(remote_name) 

180 try: 

181 if path.is_file(): 

182 path.unlink(missing_ok=True) 

183 else: 

184 shutil.rmtree(path) 

185 except FileNotFoundError: 

186 pass 

187 

188 

189class FileLock: 

190 """file lock to make safe concurrent access to directory 

191 works as context 

192 """ 

193 

194 @staticmethod 

195 def lock_folder_path(relative_path: Path) -> Path: 

196 """Args: 

197 relative_path (Path): path to resource directory relative to storage root 

198 

199 Returns: 

200 Path: abs path to folder with lock file 

201 """ 

202 config = Config() 

203 root_storage_path = Path(config.paths["root"]) 

204 return config.paths["locks"] / relative_path.relative_to(root_storage_path) 

205 

206 def __init__(self, relative_path: Path, mode: str = "w"): 

207 """Args: 

208 relative_path (Path): path to resource directory relative to storage root 

209 mode (str): lock for read (r) or write (w) 

210 """ 

211 if os.name != "posix": 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 return 

213 

214 self._local_path = FileLock.lock_folder_path(relative_path) 

215 self._lock_file_name = DIR_LOCK_FILE_NAME 

216 self._lock_file_path = self._local_path / self._lock_file_name 

217 self._mode = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH 

218 

219 if self._lock_file_path.is_file() is False: 

220 self._local_path.mkdir(parents=True, exist_ok=True) 

221 try: 

222 self._lock_file_path.write_text("") 

223 except Exception: 

224 pass 

225 

226 def __enter__(self): 

227 if os.name != "posix": 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 return 

229 

230 try: 

231 # On at least some systems, LOCK_EX can only be used if the file 

232 # descriptor refers to a file opened for writing. 

233 self._lock_fd = os.open(self._lock_file_path, os.O_RDWR | os.O_CREAT) 

234 fcntl.lockf(self._lock_fd, self._mode | fcntl.LOCK_NB) 

235 except (ValueError, FileNotFoundError): 

236 # file probably was deleted between open and lock 

237 logger.error(f"Cant accure lock on {self._local_path}") 

238 raise FileNotFoundError 

239 except BlockingIOError: 

240 logger.error(f"Directory is locked by another process: {self._local_path}") 

241 fcntl.lockf(self._lock_fd, self._mode) 

242 

243 def __exit__(self, exc_type, exc_value, traceback): 

244 if os.name != "posix": 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true

245 return 

246 

247 try: 

248 fcntl.lockf(self._lock_fd, fcntl.LOCK_UN) 

249 os.close(self._lock_fd) 

250 except Exception: 

251 pass 

252 

253 

254class S3FSStore(BaseFSStore): 

255 """Storage that stores files in amazon s3""" 

256 

257 dt_format = "%d.%m.%y %H:%M:%S.%f" 

258 

259 def __init__(self): 

260 super().__init__() 

261 if "s3_credentials" in self.config["permanent_storage"]: 

262 self.s3 = boto3.client("s3", **self.config["permanent_storage"]["s3_credentials"]) 

263 else: 

264 self.s3 = boto3.client("s3") 

265 self.bucket = self.config["permanent_storage"]["bucket"] 

266 self._thread_lock = threading.Lock() 

267 

268 def _get_remote_last_modified(self, object_name: str) -> datetime: 

269 """get time when object was created/modified 

270 

271 Args: 

272 object_name (str): name if file in bucket 

273 

274 Returns: 

275 datetime 

276 """ 

277 last_modified = self.s3.get_object_attributes( 

278 Bucket=self.bucket, Key=object_name, ObjectAttributes=["Checksum"] 

279 )["LastModified"] 

280 last_modified = last_modified.replace(tzinfo=None) 

281 return last_modified 

282 

283 @profiler.profile() 

284 def _get_local_last_modified(self, base_dir: str, local_name: str) -> datetime: 

285 """get 'last_modified' that saved locally 

286 

287 Args: 

288 base_dir (str): path to base folder 

289 local_name (str): folder name 

290 

291 Returns: 

292 datetime | None 

293 """ 

294 last_modified_file_path = Path(base_dir) / local_name / DIR_LAST_MODIFIED_FILE_NAME 

295 if last_modified_file_path.is_file() is False: 

296 return None 

297 try: 

298 last_modified_text = last_modified_file_path.read_text() 

299 last_modified_datetime = datetime.strptime(last_modified_text, self.dt_format) 

300 except Exception: 

301 return None 

302 return last_modified_datetime 

303 

304 @profiler.profile() 

305 def _save_local_last_modified(self, base_dir: str, local_name: str, last_modified: datetime): 

306 """Save 'last_modified' to local folder 

307 

308 Args: 

309 base_dir (str): path to base folder 

310 local_name (str): folder name 

311 last_modified (datetime) 

312 """ 

313 last_modified_file_path = Path(base_dir) / local_name / DIR_LAST_MODIFIED_FILE_NAME 

314 last_modified_text = last_modified.strftime(self.dt_format) 

315 last_modified_file_path.write_text(last_modified_text) 

316 

317 @profiler.profile() 

318 def _download(self, base_dir: str, remote_ziped_name: str, local_ziped_path: str, last_modified: datetime = None): 

319 """download file to s3 and unarchive it 

320 

321 Args: 

322 base_dir (str) 

323 remote_ziped_name (str) 

324 local_ziped_path (str) 

325 last_modified (datetime, optional) 

326 """ 

327 os.makedirs(base_dir, exist_ok=True) 

328 

329 remote_size = self.s3.get_object_attributes( 

330 Bucket=self.bucket, Key=remote_ziped_name, ObjectAttributes=["ObjectSize"] 

331 )["ObjectSize"] 

332 if (remote_size * 2) > psutil.virtual_memory().available: 

333 fh = io.BytesIO() 

334 self.s3.download_fileobj(self.bucket, remote_ziped_name, fh) 

335 with tarfile.open(fileobj=fh) as tar: 

336 safe_extract(tar, path=base_dir) 

337 else: 

338 self.s3.download_file(self.bucket, remote_ziped_name, local_ziped_path) 

339 shutil.unpack_archive(local_ziped_path, base_dir) 

340 os.remove(local_ziped_path) 

341 

342 # os.system(f'chmod -R 777 {base_dir}') 

343 

344 if last_modified is None: 

345 last_modified = self._get_remote_last_modified(remote_ziped_name) 

346 self._save_local_last_modified(base_dir, remote_ziped_name.replace(".tar.gz", ""), last_modified) 

347 

348 @profiler.profile() 

349 def get(self, local_name, base_dir): 

350 remote_name = local_name 

351 remote_ziped_name = f"{remote_name}.tar.gz" 

352 local_ziped_name = f"{local_name}.tar.gz" 

353 local_ziped_path = os.path.join(base_dir, local_ziped_name) 

354 

355 folder_path = Path(base_dir) / local_name 

356 with FileLock(folder_path, mode="r"): 

357 local_last_modified = self._get_local_last_modified(base_dir, local_name) 

358 remote_last_modified = self._get_remote_last_modified(remote_ziped_name) 

359 if local_last_modified is not None and local_last_modified == remote_last_modified: 

360 return 

361 

362 with FileLock(folder_path, mode="w"): 

363 self._download(base_dir, remote_ziped_name, local_ziped_path, last_modified=remote_last_modified) 

364 

365 @profiler.profile() 

366 def put(self, local_name, base_dir, compression_level=9): 

367 # NOTE: This `make_archive` function is implemente poorly and will create an empty archive file even if 

368 # the file/dir to be archived doesn't exist or for some other reason can't be archived 

369 remote_name = local_name 

370 remote_zipped_name = f"{remote_name}.tar.gz" 

371 

372 dir_path = Path(base_dir) / remote_name 

373 dir_size = sum(f.stat().st_size for f in dir_path.glob("**/*") if f.is_file()) 

374 if (dir_size * 2) < psutil.virtual_memory().available: 

375 old_cwd = os.getcwd() 

376 fh = io.BytesIO() 

377 with self._thread_lock: 

378 os.chdir(base_dir) 

379 with tarfile.open(fileobj=fh, mode="w:gz", compresslevel=compression_level) as tar: 

380 for path in dir_path.iterdir(): 

381 if path.is_file() and path.name in SERVICE_FILES_NAMES: 

382 continue 

383 tar.add(path.relative_to(base_dir)) 

384 os.chdir(old_cwd) 

385 fh.seek(0) 

386 

387 self.s3.upload_fileobj(fh, self.bucket, remote_zipped_name) 

388 else: 

389 shutil.make_archive(os.path.join(base_dir, remote_name), "gztar", root_dir=base_dir, base_dir=local_name) 

390 

391 self.s3.upload_file(os.path.join(base_dir, remote_zipped_name), self.bucket, remote_zipped_name) 

392 os.remove(os.path.join(base_dir, remote_zipped_name)) 

393 

394 last_modified = self._get_remote_last_modified(remote_zipped_name) 

395 self._save_local_last_modified(base_dir, local_name, last_modified) 

396 

397 @profiler.profile() 

398 def delete(self, remote_name): 

399 self.s3.delete_object(Bucket=self.bucket, Key=remote_name) 

400 

401 

402def FsStore(): 

403 storage_location = Config()["permanent_storage"]["location"] 

404 if storage_location == "absent": 404 ↛ 406line 404 didn't jump to line 406 because the condition on line 404 was always true

405 return AbsentFSStore() 

406 if storage_location == "local": 

407 return LocalFSStore() 

408 if storage_location == "s3": 

409 return S3FSStore() 

410 raise Exception(f"Location: '{storage_location}' not supported") 

411 

412 

413class FileStorage: 

414 def __init__(self, resource_group: str, resource_id: int, root_dir: str = "content", sync: bool = True): 

415 """ 

416 Args: 

417 resource_group (str) 

418 resource_id (int) 

419 root_dir (str) 

420 sync (bool) 

421 """ 

422 

423 self.resource_group = resource_group 

424 self.resource_id = resource_id 

425 self.root_dir = root_dir 

426 self.sync = sync 

427 

428 self.folder_name = f"{resource_group}_{ctx.company_id}_{resource_id}" 

429 

430 config = Config() 

431 self.fs_store = FsStore() 

432 self.content_path = Path(config["paths"][root_dir]) 

433 self.resource_group_path = self.content_path / resource_group 

434 self.folder_path = self.resource_group_path / self.folder_name 

435 if self.folder_path.exists() is False: 

436 self.folder_path.mkdir(parents=True, exist_ok=True) 

437 

438 @profiler.profile() 

439 def push(self, compression_level: int = 9): 

440 with FileLock(self.folder_path, mode="r"): 

441 self._push_no_lock(compression_level=compression_level) 

442 

443 @profiler.profile() 

444 def _push_no_lock(self, compression_level: int = 9): 

445 self.fs_store.put(str(self.folder_name), str(self.resource_group_path), compression_level=compression_level) 

446 

447 @profiler.profile() 

448 def push_path(self, path, compression_level: int = 9): 

449 # TODO implement push per element 

450 self.push(compression_level=compression_level) 

451 

452 @profiler.profile() 

453 def pull(self): 

454 try: 

455 self.fs_store.get(str(self.folder_name), str(self.resource_group_path)) 

456 except (FileNotFoundError, S3ClientError): 

457 pass 

458 

459 @profiler.profile() 

460 def pull_path(self, path): 

461 # TODO implement pull per element 

462 self.pull() 

463 

464 @profiler.profile() 

465 def file_set(self, name, content): 

466 if self.sync is True: 

467 self.pull() 

468 

469 with FileLock(self.folder_path, mode="w"): 

470 dest_abs_path = self.folder_path / name 

471 

472 with open(dest_abs_path, "wb") as fd: 

473 fd.write(content) 

474 

475 if self.sync is True: 

476 self._push_no_lock() 

477 

478 @profiler.profile() 

479 def file_get(self, name): 

480 if self.sync is True: 

481 self.pull() 

482 dest_abs_path = self.folder_path / name 

483 with FileLock(self.folder_path, mode="r"): 

484 with open(dest_abs_path, "rb") as fd: 

485 return fd.read() 

486 

487 @profiler.profile() 

488 def add(self, path: Union[str, Path], dest_rel_path: Optional[Union[str, Path]] = None): 

489 """Copy file/folder to persist storage 

490 

491 Examples: 

492 Copy file 'args.json' to '{storage}/args.json' 

493 >>> fs.add('/path/args.json') 

494 

495 Copy file 'args.json' to '{storage}/folder/opts.json' 

496 >>> fs.add('/path/args.json', 'folder/opts.json') 

497 

498 Copy folder 'folder' to '{storage}/folder' 

499 >>> fs.add('/path/folder') 

500 

501 Copy folder 'folder' to '{storage}/path/folder' 

502 >>> fs.add('/path/folder', 'path/folder') 

503 

504 Args: 

505 path (Union[str, Path]): path to the resource 

506 dest_rel_path (Optional[Union[str, Path]]): relative path in storage to file or folder 

507 """ 

508 if self.sync is True: 

509 self.pull() 

510 with FileLock(self.folder_path, mode="w"): 

511 path = Path(path) 

512 if isinstance(dest_rel_path, str): 

513 dest_rel_path = Path(dest_rel_path) 

514 

515 if dest_rel_path is None: 

516 dest_abs_path = self.folder_path / path.name 

517 else: 

518 dest_abs_path = self.folder_path / dest_rel_path 

519 

520 copy(str(path), str(dest_abs_path)) 

521 

522 if self.sync is True: 

523 self._push_no_lock() 

524 

525 @profiler.profile() 

526 def get_path(self, relative_path: Union[str, Path]) -> Path: 

527 """Return path to file or folder 

528 

529 Examples: 

530 get path to 'opts.json': 

531 >>> fs.get_path('folder/opts.json') 

532 ... /path/{storage}/folder/opts.json 

533 

534 Args: 

535 relative_path (Union[str, Path]): Path relative to the storage folder 

536 

537 Returns: 

538 Path: path to requested file or folder 

539 """ 

540 if self.sync is True: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 self.pull() 

542 

543 with FileLock(self.folder_path, mode="r"): 

544 if isinstance(relative_path, str): 544 ↛ 548line 544 didn't jump to line 548 because the condition on line 544 was always true

545 relative_path = Path(relative_path) 

546 # relative_path = relative_path.resolve() 

547 

548 if relative_path.is_absolute(): 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true

549 raise TypeError("FSStorage.get_path() got absolute path as argument") 

550 

551 ret_path = self.folder_path / relative_path 

552 if not ret_path.exists(): 552 ↛ 556line 552 didn't jump to line 556

553 # raise Exception('Path does not exists') 

554 os.makedirs(ret_path) 

555 

556 return ret_path 

557 

558 def delete(self, relative_path: Union[str, Path] = "."): 

559 path = (self.folder_path / relative_path).resolve() 

560 if isinstance(relative_path, str): 560 ↛ 563line 560 didn't jump to line 563 because the condition on line 560 was always true

561 relative_path = Path(relative_path) 

562 

563 if relative_path.is_absolute(): 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true

564 raise TypeError("FSStorage.delete() got absolute path as argument") 

565 

566 # complete removal 

567 if path == self.folder_path.resolve(): 567 ↛ 582line 567 didn't jump to line 582 because the condition on line 567 was always true

568 with FileLock(self.folder_path, mode="w"): 

569 self.fs_store.delete(self.folder_name) 

570 # NOTE on some fs .rmtree is not working if any file is open 

571 shutil.rmtree(str(self.folder_path)) 

572 

573 # region del file lock 

574 lock_folder_path = FileLock.lock_folder_path(self.folder_path) 

575 try: 

576 shutil.rmtree(lock_folder_path) 

577 except FileNotFoundError: 

578 logger.warning("Tried to delete file not found: %s", lock_folder_path) 

579 # endregion 

580 return 

581 

582 if self.sync is True: 

583 self.pull() 

584 

585 with FileLock(self.folder_path, mode="w"): 

586 if path.exists() is False: 

587 raise Exception("Path does not exists") 

588 

589 if path.is_file(): 

590 path.unlink(missing_ok=True) 

591 else: 

592 path.rmdir() 

593 

594 if self.sync is True: 

595 self._push_no_lock() 

596 

597 

598class FileStorageFactory: 

599 def __init__(self, resource_group: str, root_dir: str = "content", sync: bool = True): 

600 self.resource_group = resource_group 

601 self.root_dir = root_dir 

602 self.sync = sync 

603 

604 def __call__(self, resource_id: int): 

605 return FileStorage( 

606 resource_group=self.resource_group, root_dir=self.root_dir, sync=self.sync, resource_id=resource_id 

607 )