Coverage for mindsdb / integrations / handlers / lightfm_handler / lightfm_handler.py: 0%
80 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Dict, Optional
3import dill
4import pandas as pd
5from dataprep_ml.recommenders import RecommenderPreprocessor
6from lightfm import LightFM
7from lightfm.cross_validation import random_train_test_split
8from lightfm.evaluation import auc_score, precision_at_k, recall_at_k
10from mindsdb.integrations.handlers.lightfm_handler.helpers import (
11 get_item_item_recommendations,
12 get_user_item_recommendations,
13)
14from mindsdb.integrations.handlers.lightfm_handler.settings import ModelParameters
15from mindsdb.integrations.libs.base import BaseMLEngine
18class LightFMHandler(BaseMLEngine):
19 """
20 Integration with the lightfm Recommender library.
21 """
23 name = "lightfm"
25 # todo add ability to partially update model based on new data for existing users, items
26 # todo add support for hybrid recommender
27 def create(self, target: str, df: pd.DataFrame = None, args: Optional[Dict] = None):
29 args = args["using"]
31 # get model parameters if defined by user - else use default values
33 user_defined_model_params = list(
34 filter(lambda x: x in args, ["learning_rate", "loss", "epochs"])
35 )
36 args["model_params"] = {
37 model_param: args[model_param] for model_param in user_defined_model_params
38 }
39 model_parameters = ModelParameters(**args["model_params"])
41 # store model parameters
42 args["model_params"] = model_parameters.model_dump()
44 rec_preprocessor = RecommenderPreprocessor(
45 interaction_data=df,
46 user_id_column_name=args["user_id"],
47 item_id_column_name=args["item_id"],
48 threshold=args["threshold"],
49 )
51 # preprocess data
52 preprocessed_data = rec_preprocessor.preprocess()
54 args["n_users_items"] = rec_preprocessor.n_users_items
56 # get item idx to id and user idx to id maps
57 args["item_idx_to_id_map"] = preprocessed_data.idx_item_map
58 args["user_idx_to_id_map"] = preprocessed_data.idx_user_map
60 random_state = 42
62 # run evaluation if specified
63 if args.get("evaluation"):
65 evaluation_metrics = self.evaluate(
66 preprocessed_data.interaction_matrix, random_state, model_parameters
67 )
68 # convert to float to str so it can be stored in json
69 args["evaluation_metrics"] = {
70 k: str(v) for k, v in evaluation_metrics.items()
71 }
73 # train model
74 model = LightFM(
75 learning_rate=model_parameters.learning_rate,
76 loss=model_parameters.loss,
77 random_state=random_state,
78 )
79 model.fit(preprocessed_data.interaction_matrix, epochs=model_parameters.epochs)
81 self.model_storage.file_set("model", dill.dumps(model))
82 self.model_storage.json_set("args", args)
84 def predict(self, df: Optional[pd.DataFrame] = None, args: Optional[dict] = None):
86 predict_params = args["predict_params"]
88 # if user doesn't specify recommender type, default to user_item
89 try:
90 recommender_type = df["recommender_type"].tolist()[0]
91 except KeyError:
92 recommender_type = predict_params.get("recommender_type", "user_item")
94 args = self.model_storage.json_get("args")
96 # get item_id to idx and user_id to idx maps
97 args["item_id_to_idx_map"] = dict(
98 zip(args["item_idx_to_id_map"].values(), args["item_idx_to_id_map"].keys())
99 )
100 args["user_id_to_idx_map"] = dict(
101 zip(args["user_idx_to_id_map"].values(), args["user_idx_to_id_map"].keys())
102 )
104 model = dill.loads(self.model_storage.file_get("model"))
106 n_users = args["n_users_items"][0]
107 n_items = args["n_users_items"][1]
108 item_ids, user_ids = None, None
110 if recommender_type == "user_item":
111 if df is not None:
113 if args["item_id"] in df.columns:
114 n_items = df[args["item_id"]].nunique()
115 item_ids = df[args["item_id"]].unique().tolist()
117 if args["user_id"] in df.columns:
118 n_users = df[args["user_id"]].nunique()
119 user_ids = df[args["user_id"]].unique().tolist()
121 return get_user_item_recommendations(
122 n_users=n_users,
123 n_items=n_items,
124 args=args,
125 model=model,
126 item_ids=item_ids,
127 user_ids=user_ids,
128 )
130 elif recommender_type == "item_item":
131 if df is not None and args["item_id"] in df.columns:
132 item_ids = df[args["item_id"]].unique().tolist()
134 return get_item_item_recommendations(
135 model=model,
136 args=args,
137 item_ids=item_ids,
138 )
140 elif recommender_type == "user_user":
141 raise NotImplementedError(
142 "user_user recommendation type is not implemented yet"
143 )
145 else:
146 raise ValueError(
147 "recommender_type must be either 'user_item', 'item_item' or 'user_user'"
148 )
150 def describe(self, attribute=None):
151 model_args = self.model_storage.json_get("args")
153 if attribute == "model":
154 return pd.DataFrame({k: [model_args[k]] for k in ["model_params"]})
156 elif attribute == "features":
157 return pd.DataFrame(
158 {
159 "n_users_items": [model_args["n_users_items"]],
160 }
161 )
163 elif attribute == "info":
165 model_metrics = model_args["evaluation_metrics"]
167 info_dict = {k: [model_metrics[k]] for k in ["auc", "precision", "recall"]}
168 info_dict["user_id"] = [model_args["user_id"]]
169 info_dict["item_id"] = [model_args["item_id"]]
171 return pd.DataFrame(info_dict)
173 else:
174 tables = ["info", "features", "model"]
175 return pd.DataFrame(tables, columns=["tables"])
177 def evaluate(self, interaction_matrix, random_state, model_parameters):
179 train, test = random_train_test_split(
180 interaction_matrix,
181 test_percentage=0.2,
182 random_state=random_state,
183 )
184 model = LightFM(
185 learning_rate=model_parameters.learning_rate,
186 loss=model_parameters.loss,
187 random_state=random_state,
188 )
190 model.fit(train, epochs=model_parameters.epochs)
192 evaluation_metrics = dict(
193 auc=auc_score(model, test, train).mean(),
194 precision=precision_at_k(model, test, train).mean(),
195 recall=recall_at_k(model, test, train).mean(),
196 )
198 return evaluation_metrics