Coverage for mindsdb / integrations / handlers / mlflow_handler / mlflow_handler.py: 0%
44 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import requests
2from datetime import datetime
3from typing import Dict, Optional
5import pandas as pd
6from mlflow.tracking import MlflowClient
8from mindsdb.integrations.libs.base import BaseMLEngine
11class MLflowHandler(BaseMLEngine):
12 """
13 The MLflow integration engine needs to have a working connection to MLFlow. For this:
14 - All models to use should be previously served
15 - An MLflow server should be running, to access its model registry
17 Example:
18 1. Run `mlflow server -p 5001 --backend-store-uri sqlite:////path/to/mlflow.db --default-artifact-root ./artifacts --host 0.0.0.0`
19 2. Run `mlflow models serve --model-uri ./model_path`
20 3. Run MindsDB
22 Note: above, `artifacts` is a folder to store artifacts for new experiments that do not specify an artifact store.
23 """ # noqa
25 name = "mlflow"
27 def create(
28 self,
29 target: str,
30 df: Optional[pd.DataFrame] = None,
31 args: Optional[Dict] = None,
32 ) -> None:
33 args = args["using"] # ignore the rest of the problem definition
34 connection = MlflowClient(args["mlflow_server_url"], args["mlflow_server_path"])
35 model_name = args["model_name"]
36 mlflow_models = [model.name for model in connection.search_registered_models()]
38 if model_name not in mlflow_models:
39 raise Exception(
40 f"Error: model '{model_name}' not found in mlflow. Check serving and try again."
41 )
43 args["target"] = target
44 self._check_model_url(args["predict_url"])
45 self.model_storage.json_set("args", args)
47 def predict(self, df, args=None):
48 args = self.model_storage.json_get("args") # override any incoming args for now
49 self._check_model_url(args["predict_url"])
50 resp = requests.post(
51 args["predict_url"],
52 data=df.to_json(orient="records"),
53 headers={"content-type": "application/json; format=pandas-records"},
54 )
55 answer = resp.json()
56 predictions = pd.DataFrame({args["target"]: answer})
57 return predictions
59 def describe(self, key: Optional[str] = None) -> pd.DataFrame:
60 if key == "info":
61 args = self.model_storage.json_get("args")
62 connection = MlflowClient(
63 args["mlflow_server_url"], args["self.mlflow_server_path"]
64 )
65 models = {
66 model.name: model for model in connection.search_registered_models()
67 }
68 model = models[key]
69 latest_version = model.latest_versions[-1]
70 description = {
71 "NAME": [model.name],
72 "USER_DESCRIPTION": [model.description],
73 "LAST_STATUS": [latest_version.status],
74 "CREATED_AT": [
75 datetime.fromtimestamp(model.creation_timestamp // 1000).strftime(
76 "%m/%d/%Y, %H:%M:%S"
77 )
78 ],
79 "LAST_UPDATED": [
80 datetime.fromtimestamp(
81 model.last_updated_timestamp // 1000
82 ).strftime("%m/%d/%Y, %H:%M:%S")
83 ],
84 "TAGS": [model.tags],
85 "LAST_RUN_ID": [latest_version.run_id],
86 "LAST_SOURCE_PATH": [latest_version.source],
87 "LAST_USER_ID": [latest_version.user_id],
88 "LAST_VERSION": [latest_version.version],
89 }
90 return pd.DataFrame.from_dict(description)
91 else:
92 tables = ["info"]
93 return pd.DataFrame(tables, columns=["tables"])
95 @staticmethod
96 def _check_model_url(url):
97 """try post without data, check status code not in (not_found, method_not_allowed)"""
98 try:
99 resp = requests.post(url)
100 if resp.status_code in (404, 405):
101 raise Exception(
102 f"Model url is incorrect, status_code: {resp.status_code}"
103 )
104 except requests.RequestException as e:
105 raise Exception(f"Model url is incorrect: {str(e)}")