Coverage for mindsdb / integrations / handlers / popularity_recommender_handler / popularity_recommender_handler.py: 0%

27 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Dict, Optional 

2 

3import dill 

4import pandas as pd 

5import polars as pl 

6 

7from mindsdb.integrations.libs.base import BaseMLEngine 

8 

9 

10class PopularityRecommenderHandler(BaseMLEngine): 

11 """ 

12 Integration with polar based popularity recommender. 

13 """ 

14 

15 name = "popularity-recommender" 

16 

17 def create( 

18 self, 

19 target: str, 

20 df: pd.DataFrame = None, 

21 meta_data: Dict = None, 

22 args: Optional[Dict] = None, 

23 ) -> None: 

24 

25 args = args["using"] 

26 

27 interaction_data = pl.from_pandas(df) 

28 

29 args["ave_per_item_user"] = ( 

30 interaction_data.get_column(args["user_id"]) 

31 .value_counts() 

32 .mean()["count"][0] 

33 ) 

34 

35 popularity = ( 

36 interaction_data.get_column(args["item_id"]) 

37 .value_counts() 

38 .sort("count", descending=True) 

39 .get_column(args["item_id"]) 

40 .head( 

41 int(args["n_recommendations"] * args["ave_per_item_user"]) 

42 ) # to ensure there are enough to predict 

43 .to_pandas() 

44 .reset_index() 

45 .to_dict(orient="list") 

46 ) 

47 

48 self.model_storage.file_set("interaction", dill.dumps(df)) 

49 self.model_storage.json_set("popularity", popularity) 

50 self.model_storage.json_set("args", args) 

51 

52 def predict(self, df=None, args: Optional[dict] = None): 

53 

54 args = self.model_storage.json_get("args") 

55 popularity = self.model_storage.json_get("popularity") 

56 interaction = dill.loads(self.model_storage.file_get("interaction")) 

57 

58 global_popularity = [*popularity.values()][1] 

59 

60 if df is not None: 

61 # get recommendations for specific users if specified 

62 user_ids = df[args["user_id"]].unique().tolist() 

63 

64 interaction_data = pl.from_pandas(interaction).filter( 

65 pl.col(args["user_id"]).is_in(user_ids) 

66 ) 

67 

68 else: 

69 # get recommendations for all users 

70 interaction_data = pl.from_pandas(interaction) 

71 

72 # aggregate over user 

73 interacted_items = ( 

74 interaction_data.groupby(args["user_id"]) 

75 .agg(pl.col(args["item_id"]).alias("items")) 

76 .with_columns( 

77 pl.lit([global_popularity]).alias("popular_items"), 

78 ) 

79 ).lazy() 

80 

81 df = ( 

82 ( 

83 interacted_items.join( 

84 ( 

85 interacted_items.explode("popular_items") 

86 .filter(pl.col("popular_items").is_in("items").is_not()) 

87 .groupby(args["user_id"]) 

88 .agg(recommended="popular_items") 

89 ), 

90 on=args["user_id"], 

91 ).select( 

92 [ 

93 pl.col(args["user_id"]), 

94 pl.col("recommended").list.head(args["n_recommendations"]), 

95 ] 

96 ) 

97 ) 

98 .explode("recommended") 

99 .collect() 

100 .to_pandas() 

101 ) 

102 

103 return df