Skip to content

accumulator

logger = logging.getLogger(__name__) module-attribute

MetricAccumulator

Source code in src/recnexteval/evaluators/core/accumulator.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class MetricAccumulator:
    def __init__(self) -> None:
        self.acc: defaultdict[str, dict[str, Metric]] = defaultdict(dict)

    def __getitem__(self, key) -> dict[str, Metric]:
        return self.acc[key]

    def add(self, metric: Metric, algorithm_name: str) -> None:
        """Add a metric to the accumulator.

        Takes a Metric object and adds it under the algorithm name. If
        the specified metric already exists for the algorithm, it will be
        overwritten with the new metric.

        Args:
            metric: Metric to store.
            algorithm_name: Name of the algorithm.
        """
        if metric.identifier in self.acc[algorithm_name]:
            logger.warning(
                f"Metric {metric.identifier} already exists for algorithm {algorithm_name}. Overwriting..."
            )

        logger.debug(f"Metric {metric.identifier} created for algorithm {algorithm_name}")

        self.acc[algorithm_name][metric.identifier] = metric

    @property
    def user_level_metrics(self) -> defaultdict:
        results = defaultdict()
        for algo_name in self.acc:
            for metric_identifier in self.acc[algo_name]:
                metric = self.acc[algo_name][metric_identifier]
                results[(algo_name, metric.timestamp_limit, metric.name)] = (
                    metric.micro_result
                )
        return results

    @property
    def window_level_metrics(self) -> defaultdict:
        results = defaultdict(dict)
        for algo_name in self.acc:
            for metric_identifier in self.acc[algo_name]:
                metric = self.acc[algo_name][metric_identifier]
                score = metric.macro_result
                num_user = metric.num_users
                if score == 0 and num_user == 0:
                    logger.info(
                        f"Metric {metric.name} for algorithm {algo_name} "
                        f"at t={metric.timestamp_limit} has 0 score and 0 users. "
                        "The ground truth may be empty due to no interactions occurring in that window."
                    )
                elif score == 0 and num_user != 0:
                    logger.info(
                        f"Metric {metric.name} for algorithm {algo_name} "
                        f"at t={metric.timestamp_limit} has 0 score but there are interactions. "
                        f"{algo_name} did not have any correct predictions."
                    )
                results[(algo_name, metric.timestamp_limit, metric.name)]["score"] = score
                results[(algo_name, metric.timestamp_limit, metric.name)]["num_user"] = (
                    num_user
                )
        return results

    def df_user_level_metric(self) -> pd.DataFrame:
        """Get user-level metrics across all timestamps.

        Returns:
            DataFrame with user-level metric computations.
        """
        df = pd.DataFrame.from_dict(self.user_level_metrics, orient="index").explode(
            ["user_id", "score"]
        )
        df = df.rename_axis(["algorithm", "timestamp", "metric"])
        df.rename(columns={"score": "user_score"}, inplace=True)
        return df

    def df_window_level_metric(self) -> pd.DataFrame:
        df = pd.DataFrame.from_dict(self.window_level_metrics, orient="index").explode(
            ["score", "num_user"]
        )
        df = df.rename_axis(["algorithm", "timestamp", "metric"])
        df.rename(columns={"score": "window_score"}, inplace=True)
        return df

    def df_macro_level_metric(self) -> pd.DataFrame:
        """Get macro-level metrics across all timestamps.

        Returns:
            DataFrame with macro-level metric computations.
        """
        df = pd.DataFrame.from_dict(self.window_level_metrics, orient="index").explode(
            ["score", "num_user"]
        )
        df = df.rename_axis(["algorithm", "timestamp", "metric"])
        result = df.groupby(["algorithm", "metric"]).mean()["score"].to_frame()
        result["num_window"] = df.groupby(["algorithm", "metric"]).count()["score"]
        result = result.rename(columns={"score": "macro_score"})
        return result

    def df_micro_level_metric(self) -> pd.DataFrame:
        """Get micro-level metrics across all timestamps.

        Returns:
            DataFrame with micro-level metric computations.
        """
        df = pd.DataFrame.from_dict(self.user_level_metrics, orient="index").explode(
            ["user_id", "score"]
        )
        df = df.rename_axis(["algorithm", "timestamp", "metric"])
        result = df.groupby(["algorithm", "metric"])["score"].mean().to_frame()
        result["num_user"] = df.groupby(["algorithm", "metric"])["score"].count()
        result = result.rename(columns={"score": "micro_score"})
        return result

    def df_metric(
        self,
        filter_timestamp: None | int = None,
        filter_algo: None | str = None,
        level: MetricLevelEnum = MetricLevelEnum.MACRO,
    ) -> pd.DataFrame:
        """Get DataFrame representation of metrics.

        Returns a DataFrame representation of the metrics. The DataFrame can be
        filtered based on algorithm name and timestamp.

        Args:
            filter_timestamp: Timestamp value to filter on. Defaults to None.
            filter_algo: Algorithm name to filter on. Defaults to None.
            level: Level of the metric to compute. Defaults to MetricLevelEnum.MACRO.

        Returns:
            DataFrame representation of the metrics.
        """
        if level == MetricLevelEnum.MACRO:
            df = self.df_macro_level_metric()
        elif level == MetricLevelEnum.MICRO:
            df = self.df_micro_level_metric()
        elif level == MetricLevelEnum.WINDOW:
            df = self.df_window_level_metric()
        elif level == MetricLevelEnum.USER:
            df = self.df_user_level_metric()
        else:
            raise ValueError("Invalid level specified")

        if filter_algo:
            df = df.filter(like=filter_algo, axis=0)
        if filter_timestamp:
            df = df.filter(like=f"t={filter_timestamp}", axis=0)
        return df

acc = defaultdict(dict) instance-attribute

user_level_metrics property

window_level_metrics property

add(metric, algorithm_name)

Add a metric to the accumulator.

Takes a Metric object and adds it under the algorithm name. If the specified metric already exists for the algorithm, it will be overwritten with the new metric.

Parameters:

Name Type Description Default
metric Metric

Metric to store.

required
algorithm_name str

Name of the algorithm.

required
Source code in src/recnexteval/evaluators/core/accumulator.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def add(self, metric: Metric, algorithm_name: str) -> None:
    """Add a metric to the accumulator.

    Takes a Metric object and adds it under the algorithm name. If
    the specified metric already exists for the algorithm, it will be
    overwritten with the new metric.

    Args:
        metric: Metric to store.
        algorithm_name: Name of the algorithm.
    """
    if metric.identifier in self.acc[algorithm_name]:
        logger.warning(
            f"Metric {metric.identifier} already exists for algorithm {algorithm_name}. Overwriting..."
        )

    logger.debug(f"Metric {metric.identifier} created for algorithm {algorithm_name}")

    self.acc[algorithm_name][metric.identifier] = metric

df_user_level_metric()

Get user-level metrics across all timestamps.

Returns:

Type Description
DataFrame

DataFrame with user-level metric computations.

Source code in src/recnexteval/evaluators/core/accumulator.py
78
79
80
81
82
83
84
85
86
87
88
89
def df_user_level_metric(self) -> pd.DataFrame:
    """Get user-level metrics across all timestamps.

    Returns:
        DataFrame with user-level metric computations.
    """
    df = pd.DataFrame.from_dict(self.user_level_metrics, orient="index").explode(
        ["user_id", "score"]
    )
    df = df.rename_axis(["algorithm", "timestamp", "metric"])
    df.rename(columns={"score": "user_score"}, inplace=True)
    return df

df_window_level_metric()

Source code in src/recnexteval/evaluators/core/accumulator.py
91
92
93
94
95
96
97
def df_window_level_metric(self) -> pd.DataFrame:
    df = pd.DataFrame.from_dict(self.window_level_metrics, orient="index").explode(
        ["score", "num_user"]
    )
    df = df.rename_axis(["algorithm", "timestamp", "metric"])
    df.rename(columns={"score": "window_score"}, inplace=True)
    return df

df_macro_level_metric()

Get macro-level metrics across all timestamps.

Returns:

Type Description
DataFrame

DataFrame with macro-level metric computations.

Source code in src/recnexteval/evaluators/core/accumulator.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def df_macro_level_metric(self) -> pd.DataFrame:
    """Get macro-level metrics across all timestamps.

    Returns:
        DataFrame with macro-level metric computations.
    """
    df = pd.DataFrame.from_dict(self.window_level_metrics, orient="index").explode(
        ["score", "num_user"]
    )
    df = df.rename_axis(["algorithm", "timestamp", "metric"])
    result = df.groupby(["algorithm", "metric"]).mean()["score"].to_frame()
    result["num_window"] = df.groupby(["algorithm", "metric"]).count()["score"]
    result = result.rename(columns={"score": "macro_score"})
    return result

df_micro_level_metric()

Get micro-level metrics across all timestamps.

Returns:

Type Description
DataFrame

DataFrame with micro-level metric computations.

Source code in src/recnexteval/evaluators/core/accumulator.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def df_micro_level_metric(self) -> pd.DataFrame:
    """Get micro-level metrics across all timestamps.

    Returns:
        DataFrame with micro-level metric computations.
    """
    df = pd.DataFrame.from_dict(self.user_level_metrics, orient="index").explode(
        ["user_id", "score"]
    )
    df = df.rename_axis(["algorithm", "timestamp", "metric"])
    result = df.groupby(["algorithm", "metric"])["score"].mean().to_frame()
    result["num_user"] = df.groupby(["algorithm", "metric"])["score"].count()
    result = result.rename(columns={"score": "micro_score"})
    return result

df_metric(filter_timestamp=None, filter_algo=None, level=MetricLevelEnum.MACRO)

Get DataFrame representation of metrics.

Returns a DataFrame representation of the metrics. The DataFrame can be filtered based on algorithm name and timestamp.

Parameters:

Name Type Description Default
filter_timestamp None | int

Timestamp value to filter on. Defaults to None.

None
filter_algo None | str

Algorithm name to filter on. Defaults to None.

None
level MetricLevelEnum

Level of the metric to compute. Defaults to MetricLevelEnum.MACRO.

MACRO

Returns:

Type Description
DataFrame

DataFrame representation of the metrics.

Source code in src/recnexteval/evaluators/core/accumulator.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def df_metric(
    self,
    filter_timestamp: None | int = None,
    filter_algo: None | str = None,
    level: MetricLevelEnum = MetricLevelEnum.MACRO,
) -> pd.DataFrame:
    """Get DataFrame representation of metrics.

    Returns a DataFrame representation of the metrics. The DataFrame can be
    filtered based on algorithm name and timestamp.

    Args:
        filter_timestamp: Timestamp value to filter on. Defaults to None.
        filter_algo: Algorithm name to filter on. Defaults to None.
        level: Level of the metric to compute. Defaults to MetricLevelEnum.MACRO.

    Returns:
        DataFrame representation of the metrics.
    """
    if level == MetricLevelEnum.MACRO:
        df = self.df_macro_level_metric()
    elif level == MetricLevelEnum.MICRO:
        df = self.df_micro_level_metric()
    elif level == MetricLevelEnum.WINDOW:
        df = self.df_window_level_metric()
    elif level == MetricLevelEnum.USER:
        df = self.df_user_level_metric()
    else:
        raise ValueError("Invalid level specified")

    if filter_algo:
        df = df.filter(like=filter_algo, axis=0)
    if filter_timestamp:
        df = df.filter(like=f"t={filter_timestamp}", axis=0)
    return df