Coverage for mindsdb / integrations / handlers / lightfm_handler / helpers.py: 0%
49 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import lightfm
2import numpy as np
3import pandas as pd
6def get_item_user_idx(args: dict, n_users, n_items, item_ids=None, user_ids=None):
7 """
8 gets item and user idxs from item_ids and user_ids
9 :param args:
10 :param n_users:
11 :param n_items:
12 :param item_ids:
13 :param user_ids:
14 :return:
15 """
16 if item_ids and user_ids:
17 item_idx = np.array(
18 [int(args["item_id_to_idx_map"][item_id]) for item_id in item_ids]
19 )
20 user_idx = np.array(
21 [int(args["user_id_to_idx_map"][user_id]) for user_id in user_ids]
22 )
24 # repeat each user id index n_items times
25 user_idxs = np.repeat(user_idx, n_items)
27 # repeat the full list of item indexes n_user times
28 item_idxs = np.tile(item_idx, n_users)
30 elif item_ids and not user_ids:
31 item_idx = np.array(
32 [int(args["item_id_to_idx_map"][item_id]) for item_id in item_ids]
33 )
35 user_idxs = np.repeat([i for i in range(0, n_users)], n_items)
36 item_idxs = np.tile(item_idx, n_users)
38 elif user_ids and not item_ids:
39 user_idx = np.array(
40 [int(args["user_id_to_idx_map"][user_id]) for user_id in user_ids]
41 )
43 user_idxs = np.repeat(user_idx, n_items)
44 item_idxs = np.tile([i for i in range(0, n_items)], n_users)
46 else:
47 user_idxs = np.repeat([i for i in range(0, n_users)], n_items)
48 item_idxs = np.tile([i for i in range(0, n_items)], n_users)
50 return item_idxs, user_idxs
53def get_user_item_recommendations(
54 n_users: int, n_items: int, args: dict, item_ids, user_ids, model: lightfm.LightFM
55):
56 """
57 gets N user-item recommendations for a given model
58 :param n_users:
59 :param n_items:
60 :param args:
61 :param item_ids:
62 :param user_ids:
63 :param model:
65 :return:
66 """
67 # get idxs for user-item pairs
68 item_idxs, user_idxs = get_item_user_idx(args, n_users, n_items, item_ids, user_ids)
70 scores = model.predict(user_ids=user_idxs, item_ids=item_idxs)
72 # map scores to user-item pairs, sort by score and return top N recommendations per user
73 user_item_recommendations_df = (
74 pd.DataFrame({"user_idx": user_idxs, "item_idx": item_idxs, "score": scores})
75 .groupby("user_idx")
76 .apply(
77 lambda x: x.sort_values("score", ascending=False).head(
78 args["n_recommendations"]
79 )
80 )
81 )
83 # map idxs to item ids and user ids
84 user_item_recommendations_df["item_id"] = (
85 user_item_recommendations_df["item_idx"]
86 .astype("str")
87 .map(args["item_idx_to_id_map"])
88 )
89 user_item_recommendations_df["user_id"] = (
90 user_item_recommendations_df["user_idx"]
91 .astype("str")
92 .map(args["user_idx_to_id_map"])
93 )
95 return user_item_recommendations_df[["user_id", "item_id", "score"]]
98def get_item_item_recommendations(
99 model: lightfm.LightFM,
100 args: dict,
101 item_ids: list = None,
102 item_features=None,
103) -> pd.DataFrame:
104 """
105 gets similar items to a given item index inside user-item interaction matrix
106 NB by default it won't use item features,however if item features are provided
107 it will use them to get similar items
109 :param args:
110 :param model:
111 :param item_ids:
112 :param item_features:
114 :return:
115 """
116 # todo make sure its not slow across larger data
117 # todo break into smaller functions
119 n_recommendations = args["n_recommendations"]
121 similar_items_dfs = []
123 item_idx_to_id_map = args["item_idx_to_id_map"]
125 if item_ids:
126 # filter out item_ids that are not in the request
127 item_idx_to_id_map = {
128 key: val
129 for key, val in args["item_idx_to_id_map"].items()
130 if val in item_ids
131 }
133 for item_idx, item_id in item_idx_to_id_map.items():
134 # ensure item_idx is int
135 item_idx = int(item_idx)
137 item_biases, item_representations = model.get_item_representations(
138 features=item_features
139 )
141 # Cosine similarity
142 scores = item_representations.dot(item_representations[item_idx, :])
144 # normalize
145 item_norms = np.sqrt((item_representations * item_representations).sum(axis=1))
146 scores /= item_norms
148 # ensure n_recommendations is not greater than number of recommendations
149 N = min(len(scores), n_recommendations)
151 # get the top N items
152 best = np.argpartition(scores, -N)
154 # sort the scores
155 rec = sorted(
156 zip(best, scores[best] / item_norms[item_idx]), key=lambda x: -x[1]
157 )
159 intermediate_df = (
160 pd.DataFrame(rec, columns=["item_idx", "score"])
161 .tail(-1) # remove the item itself
162 .head(N)
163 )
165 intermediate_df["item_id_one"] = item_id
166 similar_items_dfs.append(intermediate_df)
168 similar_items_df = pd.concat(similar_items_dfs, ignore_index=True)
169 similar_items_df["item_idx"] = similar_items_df["item_idx"].astype("str")
171 similar_items_df["item_id_two"] = similar_items_df["item_idx"].map(
172 args["item_idx_to_id_map"]
173 )
175 return similar_items_df[["item_id_one", "item_id_two", "score"]]