Coverage for mindsdb / utilities / api_status.py: 19%
40 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import json
4from mindsdb.utilities.config import config
5from mindsdb.utilities import log
8logger = log.getLogger(__name__)
9_api_status_file = None
12def _get_api_status_file():
13 global _api_status_file
14 if _api_status_file is None:
15 # Use a temporary file that can be shared across processes.
16 temp_dir = config["paths"]["tmp"]
17 _api_status_file = os.path.join(temp_dir, "mindsdb_api_status.json")
18 # Overwrite the file if it exists.
19 if os.path.exists(_api_status_file):
20 try:
21 os.remove(_api_status_file)
22 except OSError:
23 logger.exception(f"Error removing existing API status file: {_api_status_file}")
25 return _api_status_file
28def get_api_status():
29 """Get the current API status from the shared file."""
30 status_file = _get_api_status_file()
31 try:
32 if os.path.exists(status_file):
33 with open(status_file, "r") as f:
34 return json.load(f)
35 except (json.JSONDecodeError, IOError):
36 pass
37 return {}
40def set_api_status(api_name: str, status: bool):
41 """Set the status of an API in the shared file."""
42 status_file = _get_api_status_file()
43 current_status = get_api_status()
44 current_status[api_name] = status
46 # Write atomically to avoid race conditions.
47 temp_file = status_file + ".tmp"
48 try:
49 with open(temp_file, "w") as f:
50 json.dump(current_status, f)
51 os.replace(temp_file, status_file)
52 except IOError:
53 # Clean up temp file if it exists.
54 if os.path.exists(temp_file):
55 try:
56 os.remove(temp_file)
57 except OSError:
58 pass