
class streamsight.evaluators.EvaluatorStreamer(metric_entries: List[MetricEntry], setting: Setting, metric_k: int, ignore_unknown_user: bool = True, ignore_unknown_item: bool = True, seed: int | None = None)

Bases: EvaluatorBase

Evaluation via streaming through API

The diagram below shows the diagram of the streamer evaluator for the sliding window setting. Instead of the pipeline, we allow the user to stream the data release to the algorithm. The data communication is shown between the evaluator and the algorithm. Note that while only 2 splits are shown here, the evaluator will continue to stream the data until the end of the setting where there are no more splits.

Streamer diagram

This class exposes a few of the core API that allows the user to stream the evaluation process. The following API are exposed:

  1. register_algorithm()

  2. start_stream()

  3. get_unlabeled_data()

  4. submit_prediction()

The programmer can take a look at the specific method for more details on the implementation of the API. The methods are designed with the methodological approach that the algorithm is decoupled from the the evaluating platform. And thus, the evaluator will only provide the necessary data to the algorithm and evaluate the prediction.

  • metric_entries (List[MetricEntry]) – List of metric entries

  • setting (Setting) – Setting object

  • metric_k (int) – Number of top interactions to consider

  • ignore_unknown_user (bool) – To ignore unknown users

  • ignore_unknown_item (bool) – To ignore unknown items

  • seed (Optional[int]) – Random seed for the evaluator

__init__(metric_entries: List[MetricEntry], setting: Setting, metric_k: int, ignore_unknown_user: bool = True, ignore_unknown_item: bool = True, seed: int | None = None)


Setting to evaluate the algorithms on.


Value of K for the metrics.


To ignore unknown users during evaluation.


To ignore unknown items during evaluation.

_acc: MetricAccumulator
_cache_evaluation_data() None

Cache the evaluation data for the current step.


This method will cache the evaluation data for the current step. The method will update the unknown user/item base, get the next unlabeled and ground truth data, and update the current timestamp.


The method will update the unknown user/item base with the ground truth data. Next, mask the unlabeled and ground truth data with the known user/item base. The method will cache the unlabeled and ground truth data in the internal attributes _unlabeled_data_cache and _ground_truth_data_cache. The timestamp is cached in the internal attribute _current_timestamp.

we use an internal attribute _run_step to keep track of the current step such that we can check if we have reached the last step.

We assume that any method calling this method has already checked if the there is still data to be processed.

_current_timestamp: int
_evaluate(algo_id: UUID, X_pred: csr_matrix) None

Evaluate the prediction

Given the prediction and the algorithm ID, the method will evaluate the prediction using the metrics specified in the evaluator. The prediction of the algorithm is compared to the ground truth data currently cached.

The evaluation results will be stored in the micro and macro accumulators which will later be used to calculate the final evaluation results.

  • algo_id (UUID) – The unique identifier of the algorithm

  • X_pred (csr_matrix) – The prediction of the algorithm

_get_evaluation_data() Tuple[InteractionMatrix, InteractionMatrix, int]

Get the evaluation data for the current step.

Internal method to get the evaluation data for the current step. The evaluation data consists of the unlabeled data, ground truth data, and the current timestamp which will be returned as a tuple. The shapes are masked based through user_item_base. The unknown users in the ground truth data are also updated in user_item_base.


_current_timestamp is updated with the current timestamp.


Tuple of unlabeled data, ground truth data, and current timestamp

Return type:

Tuple[csr_matrix, csr_matrix, int]


EOWSetting – If there is no more data to be processed

_prediction_shape_handler(X_true_shape: Tuple[int, int], X_pred: csr_matrix) csr_matrix

Handle shape difference of the prediction matrix.

If there is a difference in the shape of the prediction matrix and the ground truth matrix, this function will handle the difference based on ignore_unknown_user and ignore_unknown_item.

  • X_true_shape (Tuple[int,int]) – Shape of the ground truth matrix

  • X_pred (csr_matrix) – Prediction matrix


ValueError – If the user dimension of the prediction matrix is less than the ground truth matrix


Prediction matrix with the same shape as the ground truth matrix

Return type:


_transform_prediction(X_pred: csr_matrix | InteractionMatrix) csr_matrix

Transform the prediction matrix

This method is called to transform the prediction matrix to a csr_matrix. The method will check if the prediction matrix is an InteractionMatrix and if the shape attribute is defined. If the shape attribute is not defined, the method will set the shape to the known shape of the user/item base.


X_pred (Union[csr_matrix, InteractionMatrix]) – The prediction matrix


ValueError – If X_pred is not an InteractionMatrix or csr_matrix


The prediction matrix as a csr_matrix

Return type:


get_algorithm_state(algo_id: UUID) AlgorithmStateEnum

Get the state of the algorithm

This method is called to get the state of the algorithm given the unique identifier of the algorithm. The method will return the state of the algorithm.


algo_id (UUID) – Unique identifier of the algorithm


The state of the algorithm

Return type:


get_all_algorithm_status() Dict[str, AlgorithmStateEnum]

Get the status of all algorithms

This method is called to get the status of all algorithms registered with the evaluator. The method will return a dictionary where the key is the name of the algorithm and the value is the state of the algorithm.


The status of all algorithms

Return type:

Dict[str, AlgorithmStateEnum]

get_data(algo_id: UUID) InteractionMatrix

Get training data for the algorithm


This method is called to get the training data for the algorithm. The training data is defined as either the background data or the incremental data. The training data is always released irrespective of the state of the algorithm.


  1. If the state is COMPLETED, raise warning that the algorithm has completed

  2. If the state is NEW, release training data to the algorithm

  3. If the state is READY and the data segment is the same, raise warning that the algorithm has already obtained data

  4. If the state is PREDICTED and the data segment is the same, inform the algorithm that it has already predicted and should wait for other algorithms to predict

  5. This will occur when _current_timestamp has changed, which causes scenario 2 to not be caught. In this case, the algorithm is requesting the next window of data. Thus, this is a valid data call and the status will be updated to READY.

param algo_id:

Unique identifier of the algorithm

type algo_id:


raises ValueError:

If the stream has not started


The training data for the algorithm



get_unlabeled_data(algo_id: UUID) InteractionMatrix | None

Get unlabeled data for the algorithm


This method is called to get the unlabeled data for the algorithm. The unlabeled data is the data that the algorithm will predict. It will contain (user_id, -1) pairs, where the value -1 indicates that the item is to be predicted.


  1. If the state is READY/PREDICTED, return the unlabeled data

  2. If the state is COMPLETED, raise warning that the algorithm has completed

  3. ALl other same, raise warning that the algorithm has not obtained data and should call get_data() first.

param algo_id:

Unique identifier of the algorithm

type algo_id:



The unlabeled data for prediction




metric_results(level: MetricLevelEnum | Literal['macro', 'micro', 'window', 'user'] = MetricLevelEnum.MACRO, only_current_timestamp: bool | None = False, filter_timestamp: int | None = None, filter_algo: str | None = None) DataFrame

Results of the metrics computed.

Computes the metrics of all algorithms based on the level specified and return the results in a pandas DataFrame. The results can be filtered based on the algorithm name and the current timestamp.


  • User level: User level metrics computed across all timestamps.

  • Window level: Window level metrics computed across all timestamps. This can be viewed as a macro level metric in the context of a single window, where the scores of each user is averaged within the window.

  • Macro level: Macro level metrics computed for entire timeline. This score is computed by averaging the scores of all windows, treating each window equally.

  • Micro level: Micro level metrics computed for entire timeline. This score is computed by averaging the scores of all users, treating each user and the timestamp the user is in as unique contribution to the overall score.

param level:

Level of the metric to compute, defaults to “macro”

type level:

Union[MetricLevelEnum, Literal[“macro”, “micro”, “window”, “user”]]

param only_current_timestamp:

Filter only the current timestamp, defaults to False

type only_current_timestamp:

bool, optional

param filter_timestamp:

Timestamp value to filter on, defaults to None. If both only_current_timestamp and filter_timestamp are provided, filter_timestamp will be used.

type filter_timestamp:

Optional[int], optional

param filter_algo:

Algorithm name to filter on, defaults to None

type filter_algo:

Optional[str], optional


Dataframe representation of the metric



prepare_dump() None

Prepare evaluator for pickling.

This method is used to prepare the evaluator for pickling. The method will destruct the generators to avoid pickling issues.

register_algorithm(algorithm: Algorithm | None = None, algorithm_name: str | None = None) UUID

Register the algorithm with the evaluator

This method is called to register the algorithm with the evaluator. The method will assign a unique identifier to the algorithm and store the algorithm in the registry. The method will raise a ValueError if the stream has already started.


algorithm (Algorithm) – The algorithm to be registered

  • ValueError – If the stream has already started

  • ValueError – If neither algorithm nor algorithm_name is provided


The unique identifier of the algorithm

Return type:


restore() None

Restore the generators before pickling.

This method is used to restore the generators after loading the object from a pickle file.


Setting to evaluate the algorithms on.


Start the streaming process

This method is called to start the streaming process. The method will prepare the evaluator for the streaming process. The method will reset the data generators, prepare the micro and macro accumulators, update the known user/item base, and cache the evaluation data.

The method will set the internal state has_started to True. The method can be called anytime after the evaluator is instantiated. However, once the method is called, the evaluator cannot register any new algorithms.


ValueError – If the stream has already started

submit_prediction(algo_id: UUID, X_pred: csr_matrix | InteractionMatrix) None

Submit the prediction of the algorithm


This method is called to submit the prediction of the algorithm. There are a few checks that are done before the prediction is evaluated by calling _evaluate().

Once the prediction is evaluated, the method will update the state of the algorithm to PREDICTED.

The method will also check for each call if the current step of evaluation is the final one, if it is the final step, the method will update the state of the algorithm to COMPLETED.


  1. If state is READY, evaluate the prediction

  2. If state is NEW, algorithm has not obtained data, raise warning

  3. If state is PREDICTED, algorithm has already predicted, raise warning

  4. All other state, raise warning that the algorithm has completed

param algo_id:

The unique identifier of the algorithm

type algo_id:


param X_pred:

The prediction of the algorithm

type X_pred:


raises ValueError:

If X_pred is not an InteractionMatrix or csr_matrix