Coverage for mindsdb / integrations / handlers / mlflow_handler / mlflow_handler.py: 0%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import requests 

2from datetime import datetime 

3from typing import Dict, Optional 

4 

5import pandas as pd 

6from mlflow.tracking import MlflowClient 

7 

8from mindsdb.integrations.libs.base import BaseMLEngine 

9 

10 

11class MLflowHandler(BaseMLEngine): 

12 """ 

13 The MLflow integration engine needs to have a working connection to MLFlow. For this: 

14 - All models to use should be previously served 

15 - An MLflow server should be running, to access its model registry 

16 

17 Example: 

18 1. Run `mlflow server -p 5001 --backend-store-uri sqlite:////path/to/mlflow.db --default-artifact-root ./artifacts --host 0.0.0.0` 

19 2. Run `mlflow models serve --model-uri ./model_path` 

20 3. Run MindsDB 

21 

22 Note: above, `artifacts` is a folder to store artifacts for new experiments that do not specify an artifact store. 

23 """ # noqa 

24 

25 name = "mlflow" 

26 

27 def create( 

28 self, 

29 target: str, 

30 df: Optional[pd.DataFrame] = None, 

31 args: Optional[Dict] = None, 

32 ) -> None: 

33 args = args["using"] # ignore the rest of the problem definition 

34 connection = MlflowClient(args["mlflow_server_url"], args["mlflow_server_path"]) 

35 model_name = args["model_name"] 

36 mlflow_models = [model.name for model in connection.search_registered_models()] 

37 

38 if model_name not in mlflow_models: 

39 raise Exception( 

40 f"Error: model '{model_name}' not found in mlflow. Check serving and try again." 

41 ) 

42 

43 args["target"] = target 

44 self._check_model_url(args["predict_url"]) 

45 self.model_storage.json_set("args", args) 

46 

47 def predict(self, df, args=None): 

48 args = self.model_storage.json_get("args") # override any incoming args for now 

49 self._check_model_url(args["predict_url"]) 

50 resp = requests.post( 

51 args["predict_url"], 

52 data=df.to_json(orient="records"), 

53 headers={"content-type": "application/json; format=pandas-records"}, 

54 ) 

55 answer = resp.json() 

56 predictions = pd.DataFrame({args["target"]: answer}) 

57 return predictions 

58 

59 def describe(self, key: Optional[str] = None) -> pd.DataFrame: 

60 if key == "info": 

61 args = self.model_storage.json_get("args") 

62 connection = MlflowClient( 

63 args["mlflow_server_url"], args["self.mlflow_server_path"] 

64 ) 

65 models = { 

66 model.name: model for model in connection.search_registered_models() 

67 } 

68 model = models[key] 

69 latest_version = model.latest_versions[-1] 

70 description = { 

71 "NAME": [model.name], 

72 "USER_DESCRIPTION": [model.description], 

73 "LAST_STATUS": [latest_version.status], 

74 "CREATED_AT": [ 

75 datetime.fromtimestamp(model.creation_timestamp // 1000).strftime( 

76 "%m/%d/%Y, %H:%M:%S" 

77 ) 

78 ], 

79 "LAST_UPDATED": [ 

80 datetime.fromtimestamp( 

81 model.last_updated_timestamp // 1000 

82 ).strftime("%m/%d/%Y, %H:%M:%S") 

83 ], 

84 "TAGS": [model.tags], 

85 "LAST_RUN_ID": [latest_version.run_id], 

86 "LAST_SOURCE_PATH": [latest_version.source], 

87 "LAST_USER_ID": [latest_version.user_id], 

88 "LAST_VERSION": [latest_version.version], 

89 } 

90 return pd.DataFrame.from_dict(description) 

91 else: 

92 tables = ["info"] 

93 return pd.DataFrame(tables, columns=["tables"]) 

94 

95 @staticmethod 

96 def _check_model_url(url): 

97 """try post without data, check status code not in (not_found, method_not_allowed)""" 

98 try: 

99 resp = requests.post(url) 

100 if resp.status_code in (404, 405): 

101 raise Exception( 

102 f"Model url is incorrect, status_code: {resp.status_code}" 

103 ) 

104 except requests.RequestException as e: 

105 raise Exception(f"Model url is incorrect: {str(e)}")