Coverage for mindsdb / integrations / handlers / popularity_recommender_handler / popularity_recommender_handler.py: 0%
27 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Dict, Optional
3import dill
4import pandas as pd
5import polars as pl
7from mindsdb.integrations.libs.base import BaseMLEngine
10class PopularityRecommenderHandler(BaseMLEngine):
11 """
12 Integration with polar based popularity recommender.
13 """
15 name = "popularity-recommender"
17 def create(
18 self,
19 target: str,
20 df: pd.DataFrame = None,
21 meta_data: Dict = None,
22 args: Optional[Dict] = None,
23 ) -> None:
25 args = args["using"]
27 interaction_data = pl.from_pandas(df)
29 args["ave_per_item_user"] = (
30 interaction_data.get_column(args["user_id"])
31 .value_counts()
32 .mean()["count"][0]
33 )
35 popularity = (
36 interaction_data.get_column(args["item_id"])
37 .value_counts()
38 .sort("count", descending=True)
39 .get_column(args["item_id"])
40 .head(
41 int(args["n_recommendations"] * args["ave_per_item_user"])
42 ) # to ensure there are enough to predict
43 .to_pandas()
44 .reset_index()
45 .to_dict(orient="list")
46 )
48 self.model_storage.file_set("interaction", dill.dumps(df))
49 self.model_storage.json_set("popularity", popularity)
50 self.model_storage.json_set("args", args)
52 def predict(self, df=None, args: Optional[dict] = None):
54 args = self.model_storage.json_get("args")
55 popularity = self.model_storage.json_get("popularity")
56 interaction = dill.loads(self.model_storage.file_get("interaction"))
58 global_popularity = [*popularity.values()][1]
60 if df is not None:
61 # get recommendations for specific users if specified
62 user_ids = df[args["user_id"]].unique().tolist()
64 interaction_data = pl.from_pandas(interaction).filter(
65 pl.col(args["user_id"]).is_in(user_ids)
66 )
68 else:
69 # get recommendations for all users
70 interaction_data = pl.from_pandas(interaction)
72 # aggregate over user
73 interacted_items = (
74 interaction_data.groupby(args["user_id"])
75 .agg(pl.col(args["item_id"]).alias("items"))
76 .with_columns(
77 pl.lit([global_popularity]).alias("popular_items"),
78 )
79 ).lazy()
81 df = (
82 (
83 interacted_items.join(
84 (
85 interacted_items.explode("popular_items")
86 .filter(pl.col("popular_items").is_in("items").is_not())
87 .groupby(args["user_id"])
88 .agg(recommended="popular_items")
89 ),
90 on=args["user_id"],
91 ).select(
92 [
93 pl.col(args["user_id"]),
94 pl.col("recommended").list.head(args["n_recommendations"]),
95 ]
96 )
97 )
98 .explode("recommended")
99 .collect()
100 .to_pandas()
101 )
103 return df