Skip to content

stream

EvaluatorStreamer dataclass

Bases: EvaluatorBase

Evaluation via streaming through API.

The diagram below shows the diagram of the streamer evaluator for the sliding window setting. Instead of the pipeline, we allow the user to stream the data release to the algorithm. The data communication is shown between the evaluator and the algorithm. Note that while only 2 splits are shown here, the evaluator will continue to stream the data until the end of the setting where there are no more splits.

stream scheme

This class exposes a few of the core API that allows the user to stream the evaluation process. The following API are exposed:

  1. :meth:register_algorithm
  2. :meth:start_stream
  3. :meth:get_unlabeled_data
  4. :meth:submit_prediction

The programmer can take a look at the specific method for more details on the implementation of the API. The methods are designed with the methodological approach that the algorithm is decoupled from the the evaluating platform. And thus, the evaluator will only provide the necessary data to the algorithm and evaluate the prediction.

Parameters:

Name Type Description Default
metric_entries list[MetricEntry]

list of metric entries.

required
setting Setting

Setting object.

required
metric_k int

Number of top interactions to consider.

required
ignore_unknown_user bool

To ignore unknown users.

False
ignore_unknown_item bool

To ignore unknown items.

False
seed int

Random seed for the evaluator.

42
Source code in src/recnexteval/evaluators/stream/evaluator.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@dataclass
class EvaluatorStreamer(EvaluatorBase):
    """Evaluation via streaming through API.

    The diagram below shows the diagram of the streamer evaluator for the
    sliding window setting. Instead of the pipeline, we allow the user to
    stream the data release to the algorithm. The data communication is shown
    between the evaluator and the algorithm. Note that while only 2 splits are
    shown here, the evaluator will continue to stream the data until the end
    of the setting where there are no more splits.

    ![stream scheme](/assets/_static/stream_scheme.png)

    This class exposes a few of the core API that allows the user to stream
    the evaluation process. The following API are exposed:

    1. :meth:`register_algorithm`
    2. :meth:`start_stream`
    3. :meth:`get_unlabeled_data`
    4. :meth:`submit_prediction`

    The programmer can take a look at the specific method for more details
    on the implementation of the API. The methods are designed with the
    methodological approach that the algorithm is decoupled from the
    the evaluating platform. And thus, the evaluator will only provide
    the necessary data to the algorithm and evaluate the prediction.

    Args:
        metric_entries: list of metric entries.
        setting: Setting object.
        metric_k: Number of top interactions to consider.
        ignore_unknown_user: To ignore unknown users.
        ignore_unknown_item: To ignore unknown items.
        seed: Random seed for the evaluator.
    """

    _strategy: EvaluationStrategy = field(init=False)
    _algo_state_mgr: AlgorithmStateManager = field(default_factory=AlgorithmStateManager)
    _unlabeled_data_cache: PredictionMatrix = field(init=False)
    _ground_truth_data_cache: PredictionMatrix = field(init=False)
    _training_data_cache: PredictionMatrix = field(init=False)
    _state: EvaluatorState = EvaluatorState.INITIALIZED

    def __post_init__(self) -> None:
        """Initialize fields that require computation."""
        self._strategy = SlidingWindowStrategy()

    @property
    def state(self) -> EvaluatorState:
        return self._state

    def _assert_state(self, expected: EvaluatorState | list[EvaluatorState], error_msg: str) -> None:
        """Assert evaluator is in expected state"""
        if not isinstance(expected, list):
            expected = [expected]

        if self._state not in expected:
            raise RuntimeError(f"{error_msg} (Current state: {self._state.value})")
        return

    def _transition_state(self, new_state: EvaluatorState, allow_from: list[EvaluatorState]) -> None:
        """Guard state transitions explicitly"""
        if self._state not in allow_from:
            raise ValueError(f"Cannot transition from {self._state} to {new_state}. Allowed from: {allow_from}")
        self._state = new_state
        logger.info(f"Evaluator transitioned to {new_state}")

    def start_stream(self) -> None:
        """Start the streaming process.

        Warning:
            Once `start_stream` is called, the evaluator cannot register any new algorithms.

        Raises:
            ValueError: If the stream has already started.
        """
        self._assert_state(expected=EvaluatorState.INITIALIZED, error_msg="Stream has already started")
        self.setting.restore()
        logger.debug("Preparing evaluator for streaming")
        self._acc = MetricAccumulator()
        self.load_next_window()
        logger.debug("Evaluator is ready for streaming")
        # TODO: allow programmer to register anytime
        self._transition_state(new_state=EvaluatorState.STARTED, allow_from=[EvaluatorState.INITIALIZED])

    def register_model(
        self,
        algorithm: Algorithm,
        algorithm_name: None | str = None,
    ) -> UUID:
        """Register the algorithm with the evaluator.

        This method is called to register the algorithm with the evaluator.
        The method will assign a unique identifier to the algorithm and store
        the algorithm in the registry.

        Warning:
            Once `start_stream` is called, the evaluator cannot register any new algorithms.
        """
        self._assert_state(EvaluatorState.INITIALIZED, "Cannot register algorithms after stream started")
        algo_id = self._algo_state_mgr.register(name=algorithm_name, algorithm_ptr=algorithm)
        logger.debug(f"Algorithm {algo_id} registered")
        return algo_id

    def get_algorithm_state(self, algorithm_id: UUID) -> AlgorithmStateEnum:
        """Get the state of the algorithm."""
        return self._algo_state_mgr[algorithm_id].state

    def get_all_algorithm_status(self) -> dict[str, AlgorithmStateEnum]:
        """Get the status of all algorithms."""
        return self._algo_state_mgr.all_algo_states()

    def load_next_window(self) -> None:
        self.user_item_base.reset_unknown_user_item_base()
        self._training_data_cache = self._get_training_data()
        self._unlabeled_data_cache, self._ground_truth_data_cache, self._current_timestamp = self._get_evaluation_data()
        self._algo_state_mgr.set_all_ready(data_segment=self._current_timestamp)

    def get_training_data(self, algo_id: UUID) -> InteractionMatrix:
        """Get training data for the algorithm.

        Args:
            algo_id: Unique identifier of the algorithm.

        Raises:
            ValueError: If the stream has not started.

        Returns:
            The training data for the algorithm.
        """
        self._assert_state(expected=[EvaluatorState.STARTED, EvaluatorState.IN_PROGRESS], error_msg="Call start_stream() first")

        logger.debug(f"Getting data for algorithm {algo_id}")

        if self._strategy.should_advance_window(
            algo_state_mgr=self._algo_state_mgr,
            current_step=self._run_step,
            total_steps=self.setting.num_split,
        ):
            try:
                self.load_next_window()
            except EOWSettingError:
                self._transition_state(
                    EvaluatorState.COMPLETED, allow_from=[EvaluatorState.STARTED, EvaluatorState.IN_PROGRESS]
                )
                raise RuntimeError("End of evaluation window reached")

        can_request, reason = self._algo_state_mgr.can_request_training_data(algo_id)
        if not can_request:
            raise PermissionError(f"Cannot request data: {reason}")
        # TODO handle case when algo is ready after submitting prediction, but current timestamp has not changed, meaning algo is requesting same data again
        self._algo_state_mgr.transition(
            algo_id=algo_id,
            new_state=AlgorithmStateEnum.RUNNING,
            data_segment=self._current_timestamp,
        )

        self._state = EvaluatorState.IN_PROGRESS
        # release data to the algorithm
        return self._training_data_cache

    def get_unlabeled_data(self, algo_id: UUID) -> PredictionMatrix:
        """Get unlabeled data for the algorithm.

        This method is called to get the unlabeled data for the algorithm. The
        unlabeled data is the data that the algorithm will predict. It will
        contain `(user_id, -1)` pairs, where the value -1 indicates that the
        item is to be predicted.
        """
        logger.debug(f"Getting unlabeled data for algorithm {algo_id}")
        can_submit, reason = self._algo_state_mgr.can_request_unlabeled_data(algo_id)
        if not can_submit:
            raise PermissionError(f"Cannot get unlabeled data: {reason}")
        return self._unlabeled_data_cache

    def submit_prediction(self, algo_id: UUID, X_pred: csr_matrix) -> None:
        """Submit the prediction of the algorithm.

        This method is called to submit the prediction of the algorithm.
        There are a few checks that are done before the prediction is
        evaluated by calling :meth:`_evaluate_algo_pred`.

        Once the prediction is evaluated, the method will update the state
        of the algorithm to PREDICTED.
        """
        logger.debug(f"Submitting prediction for algorithm {algo_id}")
        can_submit, reason = self._algo_state_mgr.can_submit_prediction(algo_id)
        if not can_submit:
            raise PermissionError(f"Cannot submit prediction: {reason}")

        self._evaluate_algo_pred(algorithm_id=algo_id, y_pred=X_pred)
        self._algo_state_mgr.transition(
            algo_id=algo_id,
            new_state=AlgorithmStateEnum.PREDICTED,
        )

    def _evaluate_algo_pred(self, algorithm_id: UUID, y_pred: csr_matrix) -> None:
        """Evaluate the prediction for algorithm."""
        y_true = self._ground_truth_data_cache.item_interaction_sequence_matrix

        if not self.ignore_unknown_item:
            y_pred = self._prediction_unknown_item_handler(y_true=y_true, y_pred=y_pred)
        algorithm_name = self._algo_state_mgr.get_algorithm_identifier(algo_id=algorithm_id)
        self._add_metric_results_for_prediction(
            ground_truth_data=self._ground_truth_data_cache,
            y_pred=y_pred,
            algorithm_name=algorithm_name,
        )

        logger.debug(f"Prediction evaluated for algorithm {algorithm_id} complete")

metric_entries instance-attribute

setting instance-attribute

metric_k instance-attribute

ignore_unknown_user = False class-attribute instance-attribute

ignore_unknown_item = False class-attribute instance-attribute

seed = 42 class-attribute instance-attribute

user_item_base = field(default_factory=UserItemKnowledgeBase) class-attribute instance-attribute

state property

metric_results(level=MetricLevelEnum.MACRO, only_current_timestamp=False, filter_timestamp=None, filter_algo=None)

Results of the metrics computed.

Computes the metrics of all algorithms based on the level specified and return the results in a pandas DataFrame. The results can be filtered based on the algorithm name and the current timestamp.

Specifics
  • User level: User level metrics computed across all timestamps.
  • Window level: Window level metrics computed across all timestamps. This can be viewed as a macro level metric in the context of a single window, where the scores of each user is averaged within the window.
  • Macro level: Macro level metrics computed for entire timeline. This score is computed by averaging the scores of all windows, treating each window equally.
  • Micro level: Micro level metrics computed for entire timeline. This score is computed by averaging the scores of all users, treating each user and the timestamp the user is in as unique contribution to the overall score.

Parameters:

Name Type Description Default
level MetricLevelEnum | Literal['macro', 'micro', 'window', 'user']

Level of the metric to compute, defaults to "macro".

MACRO
only_current_timestamp None | bool

Filter only the current timestamp, defaults to False.

False
filter_timestamp None | int

Timestamp value to filter on, defaults to None. If both only_current_timestamp and filter_timestamp are provided, filter_timestamp will be used.

None
filter_algo None | str

Algorithm name to filter on, defaults to None.

None

Returns:

Type Description
DataFrame

Dataframe representation of the metric.

Source code in src/recnexteval/evaluators/core/base.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def metric_results(
    self,
    level: MetricLevelEnum | Literal["macro", "micro", "window", "user"] = MetricLevelEnum.MACRO,
    only_current_timestamp: None | bool = False,
    filter_timestamp: None | int = None,
    filter_algo: None | str = None,
) -> pd.DataFrame:
    """Results of the metrics computed.

    Computes the metrics of all algorithms based on the level specified and
    return the results in a pandas DataFrame. The results can be filtered
    based on the algorithm name and the current timestamp.

    Specifics
    ---------
    - User level: User level metrics computed across all timestamps.
    - Window level: Window level metrics computed across all timestamps. This can
        be viewed as a macro level metric in the context of a single window, where
        the scores of each user is averaged within the window.
    - Macro level: Macro level metrics computed for entire timeline. This
        score is computed by averaging the scores of all windows, treating each
        window equally.
    - Micro level: Micro level metrics computed for entire timeline. This
        score is computed by averaging the scores of all users, treating each
        user and the timestamp the user is in as unique contribution to the
        overall score.

    Args:
        level: Level of the metric to compute, defaults to "macro".
        only_current_timestamp: Filter only the current timestamp, defaults to False.
        filter_timestamp: Timestamp value to filter on, defaults to None.
            If both `only_current_timestamp` and `filter_timestamp` are provided,
            `filter_timestamp` will be used.
        filter_algo: Algorithm name to filter on, defaults to None.

    Returns:
        Dataframe representation of the metric.
    """
    if isinstance(level, str) and not MetricLevelEnum.has_value(level):
        raise ValueError("Invalid level specified")
    level = MetricLevelEnum(level)

    if only_current_timestamp and filter_timestamp:
        raise ValueError("Cannot specify both only_current_timestamp and filter_timestamp.")

    timestamp = None
    if only_current_timestamp:
        timestamp = self._current_timestamp

    if filter_timestamp:
        timestamp = filter_timestamp

    return self._acc.df_metric(filter_algo=filter_algo, filter_timestamp=timestamp, level=level)

plot_macro_level_metric()

Source code in src/recnexteval/evaluators/core/base.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def plot_macro_level_metric(self) -> None:
    df = self.metric_results("macro")
    df = df.reset_index()
    ax = sns.barplot(
        data=df,
        x="metric",
        y="macro_score",
        hue="algorithm",
        edgecolor="black"
    )

    ax.set_xlabel("Metric")
    ax.set_ylabel("Macro score")
    ax.set_title("Macro-level scores by metric and algorithm")

    for container in ax.containers:
        ax.bar_label(container, fmt='%.4f', padding=3, fontsize=8)
    plt.legend(
        title="Algorithm",
        loc="upper center",
        bbox_to_anchor=(0.5, -0.1),
    )
    ax.grid(axis="y", alpha=0.3, linestyle="--")
    plt.show()

plot_micro_level_metric()

Source code in src/recnexteval/evaluators/core/base.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def plot_micro_level_metric(self) -> None:
    df = self.metric_results("micro")
    df = df.reset_index()
    ax = sns.barplot(
        data=df,
        x="metric",
        y="micro_score",
        hue="algorithm",
        edgecolor="black"
    )

    ax.set_xlabel("Metric")
    ax.set_ylabel("Micro score")
    ax.set_title("Micro-level scores by metric and algorithm")

    for container in ax.containers:
        ax.bar_label(container, fmt='%.4f', padding=3, fontsize=8)
    plt.legend(
        title="Algorithm",
        loc="upper center",
        bbox_to_anchor=(0.5, -0.1),
    )
    ax.grid(axis="y", alpha=0.3, linestyle="--")
    plt.show()

plot_window_level_metric()

Source code in src/recnexteval/evaluators/core/base.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def plot_window_level_metric(self) -> None:
    df = self.metric_results("window")
    df = df.reset_index()
    metrics = df["metric"].unique()
    n_metrics = len(metrics)

    fig, axes = plt.subplots(n_metrics, 1, figsize=(10, 7), sharey=False)
    if n_metrics == 1:
        axes = [axes]

    fig.suptitle("Window-level scores over time", fontsize=14, fontweight="bold")

    for ax, metric in zip(axes, metrics):
        # Filter data for this metric
        metric_df = df[df["metric"] == metric]

        # Plot line for each algorithm
        sns.lineplot(
            data=metric_df,
            x="timestamp",
            y="window_score",
            hue="algorithm",
            marker="o",
            markersize=6,
            linewidth=2,
            ax=ax,
        )
        ax.set_xlabel("Timestamp (epoch)")
        ax.set_ylabel(f"{metric} score")
        ax.grid(axis="both", alpha=0.3, linestyle="--")

        # Remove individual legends
        if ax.get_legend() is not None:
            ax.get_legend().remove()

    # Create single shared legend at bottom
    handles, labels = axes[0].get_legend_handles_labels()

    fig.legend(
        handles,
        labels,
        title="Algorithm",
        loc="lower center",
        bbox_to_anchor=(0.5, -0.15),
        ncol=1,  # vertical stacking
        frameon=True,
        fontsize=9,
    )
    plt.show()

restore()

Restore the generators before pickling.

This method is used to restore the generators after loading the object from a pickle file.

Source code in src/recnexteval/evaluators/core/base.py
310
311
312
313
314
315
316
317
def restore(self) -> None:
    """Restore the generators before pickling.

    This method is used to restore the generators after loading the object
    from a pickle file.
    """
    self.setting.restore(self._run_step)
    logger.debug("Generators restored")

current_step()

Return the current step of the evaluator.

Returns:

Type Description
int

Current step of the evaluator.

Source code in src/recnexteval/evaluators/core/base.py
319
320
321
322
323
324
325
def current_step(self) -> int:
    """Return the current step of the evaluator.

    Returns:
        Current step of the evaluator.
    """
    return self._run_step

start_stream()

Start the streaming process.

Warning

Once start_stream is called, the evaluator cannot register any new algorithms.

Raises:

Type Description
ValueError

If the stream has already started.

Source code in src/recnexteval/evaluators/stream/evaluator.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def start_stream(self) -> None:
    """Start the streaming process.

    Warning:
        Once `start_stream` is called, the evaluator cannot register any new algorithms.

    Raises:
        ValueError: If the stream has already started.
    """
    self._assert_state(expected=EvaluatorState.INITIALIZED, error_msg="Stream has already started")
    self.setting.restore()
    logger.debug("Preparing evaluator for streaming")
    self._acc = MetricAccumulator()
    self.load_next_window()
    logger.debug("Evaluator is ready for streaming")
    # TODO: allow programmer to register anytime
    self._transition_state(new_state=EvaluatorState.STARTED, allow_from=[EvaluatorState.INITIALIZED])

register_model(algorithm, algorithm_name=None)

Register the algorithm with the evaluator.

This method is called to register the algorithm with the evaluator. The method will assign a unique identifier to the algorithm and store the algorithm in the registry.

Warning

Once start_stream is called, the evaluator cannot register any new algorithms.

Source code in src/recnexteval/evaluators/stream/evaluator.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def register_model(
    self,
    algorithm: Algorithm,
    algorithm_name: None | str = None,
) -> UUID:
    """Register the algorithm with the evaluator.

    This method is called to register the algorithm with the evaluator.
    The method will assign a unique identifier to the algorithm and store
    the algorithm in the registry.

    Warning:
        Once `start_stream` is called, the evaluator cannot register any new algorithms.
    """
    self._assert_state(EvaluatorState.INITIALIZED, "Cannot register algorithms after stream started")
    algo_id = self._algo_state_mgr.register(name=algorithm_name, algorithm_ptr=algorithm)
    logger.debug(f"Algorithm {algo_id} registered")
    return algo_id

get_algorithm_state(algorithm_id)

Get the state of the algorithm.

Source code in src/recnexteval/evaluators/stream/evaluator.py
122
123
124
def get_algorithm_state(self, algorithm_id: UUID) -> AlgorithmStateEnum:
    """Get the state of the algorithm."""
    return self._algo_state_mgr[algorithm_id].state

get_all_algorithm_status()

Get the status of all algorithms.

Source code in src/recnexteval/evaluators/stream/evaluator.py
126
127
128
def get_all_algorithm_status(self) -> dict[str, AlgorithmStateEnum]:
    """Get the status of all algorithms."""
    return self._algo_state_mgr.all_algo_states()

load_next_window()

Source code in src/recnexteval/evaluators/stream/evaluator.py
130
131
132
133
134
def load_next_window(self) -> None:
    self.user_item_base.reset_unknown_user_item_base()
    self._training_data_cache = self._get_training_data()
    self._unlabeled_data_cache, self._ground_truth_data_cache, self._current_timestamp = self._get_evaluation_data()
    self._algo_state_mgr.set_all_ready(data_segment=self._current_timestamp)

get_training_data(algo_id)

Get training data for the algorithm.

Parameters:

Name Type Description Default
algo_id UUID

Unique identifier of the algorithm.

required

Raises:

Type Description
ValueError

If the stream has not started.

Returns:

Type Description
InteractionMatrix

The training data for the algorithm.

Source code in src/recnexteval/evaluators/stream/evaluator.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_training_data(self, algo_id: UUID) -> InteractionMatrix:
    """Get training data for the algorithm.

    Args:
        algo_id: Unique identifier of the algorithm.

    Raises:
        ValueError: If the stream has not started.

    Returns:
        The training data for the algorithm.
    """
    self._assert_state(expected=[EvaluatorState.STARTED, EvaluatorState.IN_PROGRESS], error_msg="Call start_stream() first")

    logger.debug(f"Getting data for algorithm {algo_id}")

    if self._strategy.should_advance_window(
        algo_state_mgr=self._algo_state_mgr,
        current_step=self._run_step,
        total_steps=self.setting.num_split,
    ):
        try:
            self.load_next_window()
        except EOWSettingError:
            self._transition_state(
                EvaluatorState.COMPLETED, allow_from=[EvaluatorState.STARTED, EvaluatorState.IN_PROGRESS]
            )
            raise RuntimeError("End of evaluation window reached")

    can_request, reason = self._algo_state_mgr.can_request_training_data(algo_id)
    if not can_request:
        raise PermissionError(f"Cannot request data: {reason}")
    # TODO handle case when algo is ready after submitting prediction, but current timestamp has not changed, meaning algo is requesting same data again
    self._algo_state_mgr.transition(
        algo_id=algo_id,
        new_state=AlgorithmStateEnum.RUNNING,
        data_segment=self._current_timestamp,
    )

    self._state = EvaluatorState.IN_PROGRESS
    # release data to the algorithm
    return self._training_data_cache

get_unlabeled_data(algo_id)

Get unlabeled data for the algorithm.

This method is called to get the unlabeled data for the algorithm. The unlabeled data is the data that the algorithm will predict. It will contain (user_id, -1) pairs, where the value -1 indicates that the item is to be predicted.

Source code in src/recnexteval/evaluators/stream/evaluator.py
179
180
181
182
183
184
185
186
187
188
189
190
191
def get_unlabeled_data(self, algo_id: UUID) -> PredictionMatrix:
    """Get unlabeled data for the algorithm.

    This method is called to get the unlabeled data for the algorithm. The
    unlabeled data is the data that the algorithm will predict. It will
    contain `(user_id, -1)` pairs, where the value -1 indicates that the
    item is to be predicted.
    """
    logger.debug(f"Getting unlabeled data for algorithm {algo_id}")
    can_submit, reason = self._algo_state_mgr.can_request_unlabeled_data(algo_id)
    if not can_submit:
        raise PermissionError(f"Cannot get unlabeled data: {reason}")
    return self._unlabeled_data_cache

submit_prediction(algo_id, X_pred)

Submit the prediction of the algorithm.

This method is called to submit the prediction of the algorithm. There are a few checks that are done before the prediction is evaluated by calling :meth:_evaluate_algo_pred.

Once the prediction is evaluated, the method will update the state of the algorithm to PREDICTED.

Source code in src/recnexteval/evaluators/stream/evaluator.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def submit_prediction(self, algo_id: UUID, X_pred: csr_matrix) -> None:
    """Submit the prediction of the algorithm.

    This method is called to submit the prediction of the algorithm.
    There are a few checks that are done before the prediction is
    evaluated by calling :meth:`_evaluate_algo_pred`.

    Once the prediction is evaluated, the method will update the state
    of the algorithm to PREDICTED.
    """
    logger.debug(f"Submitting prediction for algorithm {algo_id}")
    can_submit, reason = self._algo_state_mgr.can_submit_prediction(algo_id)
    if not can_submit:
        raise PermissionError(f"Cannot submit prediction: {reason}")

    self._evaluate_algo_pred(algorithm_id=algo_id, y_pred=X_pred)
    self._algo_state_mgr.transition(
        algo_id=algo_id,
        new_state=AlgorithmStateEnum.PREDICTED,
    )

EvaluationStrategy

Bases: ABC

Abstract strategy for different evaluation modes

Source code in src/recnexteval/evaluators/stream/strategy.py
 6
 7
 8
 9
10
11
12
class EvaluationStrategy(ABC):
    """Abstract strategy for different evaluation modes"""

    @abstractmethod
    def should_advance_window(self, algo_state_mgr: AlgorithmStateManager, current_step: int, total_steps: int) -> bool:
        """Determine if should advance to next window"""
        pass

should_advance_window(algo_state_mgr, current_step, total_steps) abstractmethod

Determine if should advance to next window

Source code in src/recnexteval/evaluators/stream/strategy.py
 9
10
11
12
@abstractmethod
def should_advance_window(self, algo_state_mgr: AlgorithmStateManager, current_step: int, total_steps: int) -> bool:
    """Determine if should advance to next window"""
    pass

SingleTimePointStrategy

Bases: EvaluationStrategy

Strategy for sliding window evaluation

Source code in src/recnexteval/evaluators/stream/strategy.py
27
28
29
30
31
32
class SingleTimePointStrategy(EvaluationStrategy):
    """Strategy for sliding window evaluation"""

    def should_advance_window(self, algo_state_mgr: AlgorithmStateManager, current_step: int, total_steps: int) -> bool:
        """Advance only when all algorithms predicted"""
        return algo_state_mgr.is_all_predicted() and current_step < total_steps

should_advance_window(algo_state_mgr, current_step, total_steps)

Advance only when all algorithms predicted

Source code in src/recnexteval/evaluators/stream/strategy.py
30
31
32
def should_advance_window(self, algo_state_mgr: AlgorithmStateManager, current_step: int, total_steps: int) -> bool:
    """Advance only when all algorithms predicted"""
    return algo_state_mgr.is_all_predicted() and current_step < total_steps

SlidingWindowStrategy

Bases: EvaluationStrategy

Strategy for sliding window evaluation

Source code in src/recnexteval/evaluators/stream/strategy.py
15
16
17
18
19
20
21
22
23
24
class SlidingWindowStrategy(EvaluationStrategy):
    """Strategy for sliding window evaluation"""

    def should_advance_window(self, algo_state_mgr: AlgorithmStateManager, current_step: int, total_steps: int) -> bool:
        """Advance only when all algorithms predicted"""
        return (
            algo_state_mgr.is_all_predicted()
            and algo_state_mgr.is_all_same_data_segment()
            and current_step < total_steps
        )

should_advance_window(algo_state_mgr, current_step, total_steps)

Advance only when all algorithms predicted

Source code in src/recnexteval/evaluators/stream/strategy.py
18
19
20
21
22
23
24
def should_advance_window(self, algo_state_mgr: AlgorithmStateManager, current_step: int, total_steps: int) -> bool:
    """Advance only when all algorithms predicted"""
    return (
        algo_state_mgr.is_all_predicted()
        and algo_state_mgr.is_all_same_data_segment()
        and current_step < total_steps
    )