Skip to content

decay_popularity

DecayPopularity

Bases: Algorithm

A popularity-based algorithm with exponential decay over data from earlier time windows.

Source code in src/streamsight/algorithms/decay_popularity.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class DecayPopularity(Algorithm):
    """A popularity-based algorithm with exponential decay over data from earlier time windows.
    """
    IS_BASE: bool = False

    def __init__(self, K: int = 200) -> None:
        super().__init__()
        self.K = K
        self.historical_data: list[csr_matrix] = []  # Store all historical training data
        self.num_items = 0  # Track the maximum number of items seen so far

    def _pad_matrix(self, matrix: csr_matrix, new_num_items: int) -> csr_matrix:
        """
        Pad a sparse matrix with zero columns to match the new number of items.

        :param matrix: The matrix to pad
        :type matrix: csr_matrix
        :param new_num_items: The target number of columns
        :type new_num_items: int
        :return: The padded matrix
        :rtype: csr_matrix
        """
        if matrix.shape[1] >= new_num_items:
            return matrix
        padding = csr_matrix((matrix.shape[0], new_num_items - matrix.shape[1]))
        return csr_matrix(np.hstack([matrix.toarray(), padding.toarray()]))

    def _expand_historical_data(self, new_num_items: int):
        """
        Expand all matrices in historical_data to match the new number of items.

        :param new_num_items: The updated number of items
        :type new_num_items: int
        """
        for i in range(len(self.historical_data)):
            if self.historical_data[i].shape[1] < new_num_items:
                self.historical_data[i] = self._pad_matrix(self.historical_data[i], new_num_items)

    def _fit(self, X: csr_matrix) -> "DecayPopularity":
        """
        Fit the model by applying decay to historical data and adding new data.

        :param X: Interaction matrix (users x items) for the current window
        :type X: csr_matrix
        """
        # Update the maximum number of items
        new_num_items = X.shape[1]
        if new_num_items > self.num_items:
            self._expand_historical_data(new_num_items)
            self.num_items = new_num_items

        # Append the new matrix (ensure it has the correct number of items)
        if X.shape[1] < self.num_items:
            X = self._pad_matrix(X, self.num_items)

        # Append new data to historical data
        self.historical_data.append(X)

        # Initialize decayed scores
        num_items = X.shape[1]
        if num_items < self.K:
            warn("K is larger than the number of items.", UserWarning)

        decayed_scores = np.zeros(num_items)

        # Apply decay to each historical matrix
        for i, matrix in enumerate(self.historical_data):
            # length 2, i = 0 -> 2-1-0 = 1, i = 1 -> 2-1-1 = 0
            # length 3, i = 0 -> 3-1-0 = 2, i = 1 -> 3-1-1 = 1, i = 2 -> 3-1-2 = 0
            decay_factor = np.exp(-(len(self.historical_data) - 1 - i))
            decayed_scores += matrix.sum(axis=0).A[0] * decay_factor

        normalized_scores = decayed_scores / decayed_scores.max()

        K = min(self.K, num_items)
        ind = np.argpartition(normalized_scores, -K)[-K:]
        a = np.zeros(num_items)
        a[ind] = normalized_scores[ind]
        self.decayed_scores_ = a
        return self

    def _predict(self, X: csr_matrix, predict_im: InteractionMatrix) -> csr_matrix:
        """
        Predict the K most popular item for each user scaled by the decay factor.
        """
        if predict_im is None:
            raise AttributeError("Predict frame with requested ID is required for Popularity algorithm")

        predict_frame = predict_im._df

        users = predict_frame["uid"].unique().tolist()
        known_item_id = X.shape[1]

        # predict_frame contains (user_id, -1) pairs
        max_user_id = predict_frame["uid"].max() + 1
        intended_shape = (max(max_user_id, X.shape[0]), known_item_id)

        X_pred = lil_matrix(intended_shape)
        X_pred[users] = self.decayed_scores_

        return X_pred.tocsr()

IS_BASE = False class-attribute instance-attribute

K = K instance-attribute

historical_data = [] instance-attribute

num_items = 0 instance-attribute

name property

Name of the object's class.

:return: Name of the object's class :rtype: str

params property

Parameters of the object.

:return: Parameters of the object :rtype: dict

identifier property

Identifier of the object.

Identifier is made by combining the class name with the parameters passed at construction time.

Constructed by recreating the initialisation call. Example: Algorithm(param_1=value)

:return: Identifier of the object :rtype: str

ITEM_USER_BASED instance-attribute

seed = 42 instance-attribute

rand_gen = np.random.default_rng(seed=(self.seed)) instance-attribute

description property

Description of the algorithm.

:return: Description of the algorithm :rtype: str

get_params() abstractmethod

Get the parameters of the object.

:return: Parameters of the object :rtype: dict

Source code in src/streamsight/models/base.py
38
39
40
41
42
43
44
45
@abstractmethod
def get_params(self) -> dict[str, Any]:
    """Get the parameters of the object.

    :return: Parameters of the object
    :rtype: dict
    """
    ...

get_default_params() classmethod

Get default parameters without instantiation.

Uses inspect.signature to extract init parameters and their default values without instantiating the class.

Returns:

Type Description
dict

Dictionary of parameter names to default values.

dict

Parameters without defaults map to None.

Source code in src/streamsight/algorithms/base.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@classmethod
def get_default_params(cls) -> dict:
    """Get default parameters without instantiation.

    Uses inspect.signature to extract __init__ parameters and their
    default values without instantiating the class.

    Returns:
        Dictionary of parameter names to default values.
        Parameters without defaults map to None.
    """
    try:
        sig = signature(cls.__init__)
    except (ValueError, TypeError):
        # Fallback for built-in types or special cases
        return {}

    params = {}
    for param_name, param in sig.parameters.items():
        if param_name == "self":
            continue

        if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
            # Skip *args, **kwargs
            continue

        # Extract the default value
        if param.default is not Parameter.empty:
            params[param_name] = param.default
        else:
            params[param_name] = None

    return params

set_params(**params)

Set the parameters of the estimator.

:param params: Estimator parameters :type params: dict

Source code in src/streamsight/algorithms/base.py
 94
 95
 96
 97
 98
 99
100
def set_params(self, **params) -> Self:
    """Set the parameters of the estimator.

    :param params: Estimator parameters
    :type params: dict
    """
    return super().set_params(**params)

fit(X)

Fit the model to the input interaction matrix.

The input data is transformed to the expected type using :meth:_transform_fit_input. The fitting is done using the :meth:_fit method. Finally the method checks that the fitting was successful using :meth:_check_fit_complete.

:param X: The interactions to fit the model on. :type X: InteractionMatrix :return: Fitted algorithm :rtype: Algorithm

Source code in src/streamsight/algorithms/base.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def fit(self, X: InteractionMatrix) -> Self:
    """Fit the model to the input interaction matrix.

    The input data is transformed to the expected type using
    :meth:`_transform_fit_input`. The fitting is done using the
    :meth:`_fit` method. Finally the method checks that the fitting
    was successful using :meth:`_check_fit_complete`.

    :param X: The interactions to fit the model on.
    :type X: InteractionMatrix
    :return: Fitted algorithm
    :rtype: Algorithm
    """
    start = time.time()
    X_transformed = self._transform_fit_input(X)
    self._fit(X_transformed)

    self._check_fit_complete()
    end = time.time()
    logger.debug(f"Fitting {self.name} complete - Took {end - start:.3}s")
    return self

predict(X)

Predicts scores, given the interactions in X

The input data is transformed to the expected type using :meth:_transform_predict_input. The predictions are made using the :meth:_predict method. Finally the predictions are then padded with random items for users that are not in the training data.

:param X: interactions to predict from. :type X: InteractionMatrix :return: The recommendation scores in a sparse matrix format. :rtype: csr_matrix

Source code in src/streamsight/algorithms/base.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def predict(self, X: PredictionMatrix) -> csr_matrix:
    """Predicts scores, given the interactions in X

    The input data is transformed to the expected type using
    :meth:`_transform_predict_input`. The predictions are made
    using the :meth:`_predict` method. Finally the predictions
    are then padded with random items for users that are not in the
    training data.

    :param X: interactions to predict from.
    :type X: InteractionMatrix
    :return: The recommendation scores in a sparse matrix format.
    :rtype: csr_matrix
    """
    self._check_fit_complete()
    X_pred = self._predict(X)
    return X_pred