Coverage for mindsdb / integrations / handlers / lightfm_handler / lightfm_handler.py: 0%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Dict, Optional 

2 

3import dill 

4import pandas as pd 

5from dataprep_ml.recommenders import RecommenderPreprocessor 

6from lightfm import LightFM 

7from lightfm.cross_validation import random_train_test_split 

8from lightfm.evaluation import auc_score, precision_at_k, recall_at_k 

9 

10from mindsdb.integrations.handlers.lightfm_handler.helpers import ( 

11 get_item_item_recommendations, 

12 get_user_item_recommendations, 

13) 

14from mindsdb.integrations.handlers.lightfm_handler.settings import ModelParameters 

15from mindsdb.integrations.libs.base import BaseMLEngine 

16 

17 

18class LightFMHandler(BaseMLEngine): 

19 """ 

20 Integration with the lightfm Recommender library. 

21 """ 

22 

23 name = "lightfm" 

24 

25 # todo add ability to partially update model based on new data for existing users, items 

26 # todo add support for hybrid recommender 

27 def create(self, target: str, df: pd.DataFrame = None, args: Optional[Dict] = None): 

28 

29 args = args["using"] 

30 

31 # get model parameters if defined by user - else use default values 

32 

33 user_defined_model_params = list( 

34 filter(lambda x: x in args, ["learning_rate", "loss", "epochs"]) 

35 ) 

36 args["model_params"] = { 

37 model_param: args[model_param] for model_param in user_defined_model_params 

38 } 

39 model_parameters = ModelParameters(**args["model_params"]) 

40 

41 # store model parameters 

42 args["model_params"] = model_parameters.model_dump() 

43 

44 rec_preprocessor = RecommenderPreprocessor( 

45 interaction_data=df, 

46 user_id_column_name=args["user_id"], 

47 item_id_column_name=args["item_id"], 

48 threshold=args["threshold"], 

49 ) 

50 

51 # preprocess data 

52 preprocessed_data = rec_preprocessor.preprocess() 

53 

54 args["n_users_items"] = rec_preprocessor.n_users_items 

55 

56 # get item idx to id and user idx to id maps 

57 args["item_idx_to_id_map"] = preprocessed_data.idx_item_map 

58 args["user_idx_to_id_map"] = preprocessed_data.idx_user_map 

59 

60 random_state = 42 

61 

62 # run evaluation if specified 

63 if args.get("evaluation"): 

64 

65 evaluation_metrics = self.evaluate( 

66 preprocessed_data.interaction_matrix, random_state, model_parameters 

67 ) 

68 # convert to float to str so it can be stored in json 

69 args["evaluation_metrics"] = { 

70 k: str(v) for k, v in evaluation_metrics.items() 

71 } 

72 

73 # train model 

74 model = LightFM( 

75 learning_rate=model_parameters.learning_rate, 

76 loss=model_parameters.loss, 

77 random_state=random_state, 

78 ) 

79 model.fit(preprocessed_data.interaction_matrix, epochs=model_parameters.epochs) 

80 

81 self.model_storage.file_set("model", dill.dumps(model)) 

82 self.model_storage.json_set("args", args) 

83 

84 def predict(self, df: Optional[pd.DataFrame] = None, args: Optional[dict] = None): 

85 

86 predict_params = args["predict_params"] 

87 

88 # if user doesn't specify recommender type, default to user_item 

89 try: 

90 recommender_type = df["recommender_type"].tolist()[0] 

91 except KeyError: 

92 recommender_type = predict_params.get("recommender_type", "user_item") 

93 

94 args = self.model_storage.json_get("args") 

95 

96 # get item_id to idx and user_id to idx maps 

97 args["item_id_to_idx_map"] = dict( 

98 zip(args["item_idx_to_id_map"].values(), args["item_idx_to_id_map"].keys()) 

99 ) 

100 args["user_id_to_idx_map"] = dict( 

101 zip(args["user_idx_to_id_map"].values(), args["user_idx_to_id_map"].keys()) 

102 ) 

103 

104 model = dill.loads(self.model_storage.file_get("model")) 

105 

106 n_users = args["n_users_items"][0] 

107 n_items = args["n_users_items"][1] 

108 item_ids, user_ids = None, None 

109 

110 if recommender_type == "user_item": 

111 if df is not None: 

112 

113 if args["item_id"] in df.columns: 

114 n_items = df[args["item_id"]].nunique() 

115 item_ids = df[args["item_id"]].unique().tolist() 

116 

117 if args["user_id"] in df.columns: 

118 n_users = df[args["user_id"]].nunique() 

119 user_ids = df[args["user_id"]].unique().tolist() 

120 

121 return get_user_item_recommendations( 

122 n_users=n_users, 

123 n_items=n_items, 

124 args=args, 

125 model=model, 

126 item_ids=item_ids, 

127 user_ids=user_ids, 

128 ) 

129 

130 elif recommender_type == "item_item": 

131 if df is not None and args["item_id"] in df.columns: 

132 item_ids = df[args["item_id"]].unique().tolist() 

133 

134 return get_item_item_recommendations( 

135 model=model, 

136 args=args, 

137 item_ids=item_ids, 

138 ) 

139 

140 elif recommender_type == "user_user": 

141 raise NotImplementedError( 

142 "user_user recommendation type is not implemented yet" 

143 ) 

144 

145 else: 

146 raise ValueError( 

147 "recommender_type must be either 'user_item', 'item_item' or 'user_user'" 

148 ) 

149 

150 def describe(self, attribute=None): 

151 model_args = self.model_storage.json_get("args") 

152 

153 if attribute == "model": 

154 return pd.DataFrame({k: [model_args[k]] for k in ["model_params"]}) 

155 

156 elif attribute == "features": 

157 return pd.DataFrame( 

158 { 

159 "n_users_items": [model_args["n_users_items"]], 

160 } 

161 ) 

162 

163 elif attribute == "info": 

164 

165 model_metrics = model_args["evaluation_metrics"] 

166 

167 info_dict = {k: [model_metrics[k]] for k in ["auc", "precision", "recall"]} 

168 info_dict["user_id"] = [model_args["user_id"]] 

169 info_dict["item_id"] = [model_args["item_id"]] 

170 

171 return pd.DataFrame(info_dict) 

172 

173 else: 

174 tables = ["info", "features", "model"] 

175 return pd.DataFrame(tables, columns=["tables"]) 

176 

177 def evaluate(self, interaction_matrix, random_state, model_parameters): 

178 

179 train, test = random_train_test_split( 

180 interaction_matrix, 

181 test_percentage=0.2, 

182 random_state=random_state, 

183 ) 

184 model = LightFM( 

185 learning_rate=model_parameters.learning_rate, 

186 loss=model_parameters.loss, 

187 random_state=random_state, 

188 ) 

189 

190 model.fit(train, epochs=model_parameters.epochs) 

191 

192 evaluation_metrics = dict( 

193 auc=auc_score(model, test, train).mean(), 

194 precision=precision_at_k(model, test, train).mean(), 

195 recall=recall_at_k(model, test, train).mean(), 

196 ) 

197 

198 return evaluation_metrics