Source code for xcolumns.weighted_prediction

import os
from time import time
from typing import Optional, Tuple, Union

import numpy as np
from scipy.sparse import csr_matrix

from .numba_csr_functions import (
    numba_predict_macro_balanced_accuracy_csr,
    numba_predict_weighted_per_instance_csr,
)
from .types import TORCH_AVAILABLE, DenseMatrix, DType, Matrix
from .utils import unpack_csr_matrix, zeros_like


if TORCH_AVAILABLE:
    import torch


########################################################################################
# General functions for weighted prediction
########################################################################################


def _predict_weighted_per_instance_dense(
    y_proba: DenseMatrix,
    k: int,
    th: float = 0.0,
    a: Optional[DenseMatrix] = None,
    b: Optional[DenseMatrix] = None,
    keep_scores: bool = False,
    dtype: Optional[DType] = None,
) -> DenseMatrix:
    n, m = y_proba.shape
    y_pred = zeros_like(y_proba, dtype=dtype)

    gains = y_proba
    if a is not None:
        gains = gains * a
    if b is not None:
        gains = gains + b

    if k > 0:
        # Numpy implementation
        if isinstance(y_proba, np.ndarray):
            top_k = np.argpartition(-gains, k, axis=1)[:, :k]
            y_pred[np.arange(n)[:, None], top_k] = (
                gains[np.arange(n)[:, None], top_k] if keep_scores else 1
            )

        # Torch implementation
        elif TORCH_AVAILABLE and isinstance(y_proba, torch.Tensor):
            _, top_k = torch.topk(gains, k, axis=1)
            y_pred[torch.arange(n)[:, None], top_k] = (
                gains[torch.arange(n)[:, None], top_k] if keep_scores else 1
            )
    else:
        y_pred[gains >= th] = 1

    return y_pred


def _predict_weighted_per_instance_csr(
    y_proba: csr_matrix,
    k: int,
    th: float = 0.0,
    a: Optional[np.ndarray] = None,
    b: Optional[np.ndarray] = None,
    keep_scores: bool = False,
    dtype: Optional[np.dtype] = None,
) -> csr_matrix:
    if a is not None and a.dtype != y_proba.dtype:
        a = a.astype(y_proba.dtype)
    if b is not None and b.dtype != y_proba.dtype:
        b = b.astype(y_proba.dtype)

    n, m = y_proba.shape
    (
        y_pred_data,
        y_pred_indices,
        y_pred_indptr,
    ) = numba_predict_weighted_per_instance_csr(
        *unpack_csr_matrix(y_proba), k, th, a, b, keep_scores
    )

    return csr_matrix(
        (y_pred_data, y_pred_indices, y_pred_indptr), shape=(n, m), dtype=dtype
    )


[docs] def predict_weighted_per_instance( y_proba: Matrix, k: int, th: float = 0.0, a: Optional[DenseMatrix] = None, b: Optional[DenseMatrix] = None, dtype: Optional[DType] = None, keep_scores: bool = False, return_meta: bool = False, return_weights: bool = False, ) -> Union[Matrix, Tuple[Matrix, dict]]: r""" Returns the weighted prediction for each instance (row) in a provided matrix of conditional probabilities estimates of labels :math:`\boldsymbol{H}`, (**y_proba**), where each element :math:`\eta_{ij} = P(y_j|x_i)` is the probability of the label :math:`j` for the instance :math:`i`, The gains vector :math:`\boldsymbol{g}` is calculated for each instance :math:`i` as follows: .. math:: \boldsymbol{g} = \boldsymbol{a} \odot \boldsymbol{\eta}_i + \boldsymbol{b} If **k** is larger than 0, the top **k** labels with the highest gains are selected for the instance. If **k** is 0, then the labels with gains higher than **th** are selected for the instance. Args: y_proba: A 2D matrix of conditional probabilities for each label of shape (n, m). k: The number of labels to predict for each instance. th: The single number threshold or a vector of thresholds for the gains. Only used if **k** is 0. If a vector, it needs to be a size of number of columns of **y_proba** (m). a: The vector of slopes (coefficients) :math:`\boldsymbol{a}` used for calculating gains. It needs to be a size of number of columns of **y_proba** (m). If equal to None, then :math:`\boldsymbol{a} = \boldsymbol{1}`. b: The vector of intercepts (constants) :math:`\boldsymbol{b}` used for calculating gains. It needs to be a size of number of columns of **y_proba** (m). If equal to None, then :math:`\boldsymbol{b} = \boldsymbol{0}`. keep_scores: Whether to keep the scores in the output prediction matrix instead of 1. dtype: The data type for the output matrix. If equal to None, the data type of **y_proba** will be used. return_meta: Whether to return meta data. Returns: The binary prediction matrix: the shape and type of the matrix is the same as **y_proba**. If **return_meta** is True, additionally, a dictionary is returned, that contains the time taken to calculate the prediction. """ # Arguments validation # Check y_proba if not isinstance(y_proba, Matrix): raise ValueError( "y_proba must be either np.ndarray, torch.Tensor, or csr_matrix" ) if len(y_proba.shape) == 1: y_proba = y_proba.reshape(1, -1) elif len(y_proba.shape) > 2: raise ValueError("y_proba must be 1d or 2d") # Check k and th if not isinstance(k, int): raise ValueError("k must be an integer") # Check a and b n, m = y_proba.shape if a is not None: if not isinstance(a, DenseMatrix): raise ValueError("a must be np.ndarray or torch.Tensor") if a.shape != (m,): raise ValueError("a must be of shape (y_proba[1],)") if b is not None: if not isinstance(b, DenseMatrix): raise ValueError("b must be np.ndarray or torch.Tensor") if b.shape != (m,): raise ValueError("b must be of shape (y_proba[1],)") # Initialize the meta data dictionary if return_meta: meta = {"iters": 1, "time": time()} # Invoke the specialized implementation if isinstance(y_proba, DenseMatrix): y_pred = _predict_weighted_per_instance_dense( y_proba, k, th=th, a=a, b=b, dtype=dtype, keep_scores=keep_scores ) elif isinstance(y_proba, csr_matrix): y_pred = _predict_weighted_per_instance_csr( y_proba, k, th=th, a=a, b=b, dtype=dtype, keep_scores=keep_scores ) if return_meta: meta["time"] = time() - meta["time"] if return_weights: meta["a"] = a meta["b"] = b return y_pred, meta else: return y_pred
######################################################################################## # Specialized functions for specific metrics ########################################################################################
[docs] def predict_top_k( y_proba: Matrix, k: int, dtype: Optional[DType] = None, keep_scores: bool = False, return_meta: bool = False, ) -> Union[Matrix, Tuple[Matrix, dict]]: r""" Predicts the top **k** labels for each instance (row) in a provided matrix of conditional probabilities estimates of labels :math:`\eta` (**y_proba**). This is optimal inference strategy for precision at k and nDCG at k. It is equivalent to calling ``predict_weighted_per_instance(y_proba, k=k, a=None, b=None, return_meta=return_meta)``. Args: y_proba: A 2D matrix of conditional probabilities for each label. k: The number of labels to predict for each instance. keep_scores: Whether to keep the scores in the output prediction matrix instead of 1. dtype: The data type for the output matrix, if equal to None, the data type of **y_proba** will be used. return_meta: Whether to return metadata. Defaults to False. Returns: The binary prediction matrix: with exactly **k** labels in each row, the shape and type of the matrix is the same as **y_proba**. If **return_meta** is True, additionally, a dictionary is returned, that contains the time taken to calculate the prediction. """ return predict_weighted_per_instance( y_proba, k=k, dtype=dtype, keep_scores=keep_scores, return_meta=return_meta )
[docs] def predict_optimizing_macro_recall( y_proba: Matrix, k: int, priors: DenseMatrix, epsilon: float = 1e-6, keep_scores: bool = False, dtype: Optional[DType] = None, return_meta: bool = False, return_weights: bool = False, ) -> Union[Matrix, Tuple[Matrix, dict]]: r""" Predicts the **k** labels for each instance (row) in a provided matrix of conditional probabilities estimates of labels :math:`\eta` (**y_proba**), such that the prediction optimizes macro-averaged recall for the population with the given prior probabilities of labels (**priors**). It is equivalent to calling ``predict_weighted_per_instance(y_proba, k=k, a=1.0 / (priors + epsilon), return_meta=return_meta)``. Args: y_proba: A 2D matrix of conditional probabilities for each label. k: The number of labels to predict for each instance. priors: The prior probabilities for each label. epsilon: A small value to avoid division by zero when calculating inverse of priors. keep_scores: Whether to keep the scores in the output prediction matrix instead of 1. dtype: The data type for the output matrix, if equal to None, the data type of **y_proba** will be used. return_meta: Whether to return metadata. Returns: The binary prediction matrix: with exactly **k** labels in each row, the shape and type of the matrix is the same as **y_proba**. If **return_meta** is True, additionally, a dictionary is returned, that contains the time taken to calculate the prediction. """ if priors.shape[0] != y_proba.shape[1]: raise ValueError("priors must be of shape (y_proba[1],)") weights = 1.0 / (priors + epsilon) return predict_weighted_per_instance( y_proba, k=k, a=weights, dtype=dtype, keep_scores=keep_scores, return_meta=return_meta, return_weights=return_weights, )
def _predict_optimizing_macro_balanced_accuracy_dense( y_proba: DenseMatrix, k: int, priors: DenseMatrix, epsilon: float = 1e-6, dtype: Optional[DType] = None, ) -> Matrix: n, m = y_proba.shape priors = priors + epsilon y_pred = zeros_like(y_proba, dtype=dtype) gains = y_proba / priors - (1 - y_proba) / (1 - priors) if k > 0: # Numpy implementation if isinstance(y_proba, np.ndarray): top_k = np.argpartition(-gains, k, axis=1)[:, :k] y_pred[np.arange(n)[:, None], top_k] = 1 # Torch implementation elif TORCH_AVAILABLE and isinstance(y_proba, torch.Tensor): _, top_k = torch.topk(gains, k, axis=1) y_pred[torch.arange(n)[:, None], top_k] = 1 else: y_pred[gains >= 0] = 1 return y_pred def _predict_optimizing_macro_balanced_accuracy_csr( y_proba: csr_matrix, k: int, priors: np.array, epsilon: float = 1e-6, dtype: Optional[DType] = None, ) -> csr_matrix: n, m = y_proba.shape priors = priors + epsilon data, indices, indptr = numba_predict_macro_balanced_accuracy_csr( *unpack_csr_matrix(y_proba), n, m, k, priors, ) return csr_matrix( (data, indices, indptr), shape=y_proba.shape, dtype=y_proba.dtype if dtype is None else dtype, )
[docs] def predict_optimizing_macro_balanced_accuracy( y_proba: Matrix, k: int, priors: DenseMatrix, epsilon: float = 1e-6, dtype: Optional[DType] = None, return_meta: bool = False, ) -> Union[Matrix, Tuple[Matrix, dict]]: r""" Predicts the **k** labels for each instance (row) in a provided matrix of conditional probabilities estimates of labels :math:`\eta` (**y_proba**), such that the prediction at **k** optimizes macro-averaged balanced accuracy for the population with the given prior probabilities of labels (**priors**). Args: y_proba: A 2D matrix of conditional probabilities for each label. k: The number of labels to predict for each instance. priors: The prior probabilities for each label. epsilon: A small value to avoid division by zero when calculating inverse of priors. dtype: The data type for the output matrix, if equal to None, the data type of **y_proba** will be used. return_meta: Whether to return meta data. Returns: The binary prediction matrix: the shape and type of the matrix is the same as **y_proba**. If **return_meta** is True, additionally, a dictionary is returned, that contains the time taken to calculate the prediction. """ if priors.shape[0] != y_proba.shape[1]: raise ValueError("priors must be of shape (y_proba[1],)") if return_meta: meta = {"iters": 1, "time": time()} if isinstance(y_proba, DenseMatrix): y_pred = _predict_optimizing_macro_balanced_accuracy_dense( y_proba, k, priors, epsilon=epsilon, dtype=dtype ) elif isinstance(y_proba, csr_matrix): y_pred = _predict_optimizing_macro_balanced_accuracy_csr( y_proba, k, priors, epsilon=epsilon, dtype=dtype ) else: raise ValueError( "y_proba must be either np.ndarray, torch.Tensor, or csr_matrix" ) if return_meta: meta["time"] = time() - meta["time"] return y_pred, meta else: return y_pred
[docs] def predict_log_weighted_per_instance( y_proba: Matrix, k: int, priors: DenseMatrix, epsilon: float = 1e-9, keep_scores: bool = False, dtype: Optional[DType] = None, return_meta: bool = False, return_weights: bool = False, ) -> Union[Matrix, Tuple[Matrix, dict]]: r""" Predicts the top **k** labels for each instance (row) in provided matrix of conditional probabilities estimates of labels :math:`\eta` (**y_proba**) according to the log law weighting scheme: .. math:: a = -\log (\pi + \epsilon) where :math:`\pi` (**priors**) is the prior probability of each label, :math:`\beta` (beta) is power parameter and :math:`\epsilon` (**epsilon**) is a small value to avoid domain error. It is equivalent to calling ``predict_weighted_per_instance(y_proba, k=k, a=-log(priors + epsilon), return_meta=return_meta)``. Args: y_proba: A 2D matrix of conditional probabilities for each label. k: The number of labels to predict for each instance. priors: The prior probabilities for each label. epsilon: A small value to avoid domain error. keep_scores: Whether to keep the scores in the output prediction matrix instead of 1. dtype: The data type for the output matrix, if equal to None, the data type of **y_proba** will be used. return_meta: Whether to return meta data. Result The binary prediction matrix: with exactly **k** labels in each row, the shape and type of the matrix is the same as **y_proba**. If **return_meta** is True, additionally, a dictionary is returned, that contains the time taken to calculate the prediction. """ if priors.shape[0] != y_proba.shape[1]: raise ValueError("priors must be of shape (y_proba[1],)") weights = -np.log(priors + epsilon) return predict_weighted_per_instance( y_proba, k=k, a=weights, keep_scores=keep_scores, dtype=dtype, return_meta=return_meta, return_weights=return_weights, )
[docs] def predict_power_law_weighted_per_instance( y_proba: Matrix, k: int, priors: DenseMatrix, beta: float, epsilon: float = 1e-9, keep_scores: bool = False, dtype: Optional[DType] = None, return_meta: bool = False, return_weights: bool = False, ) -> Union[Matrix, Tuple[Matrix, dict]]: r""" Predicts the top **k** labels for each instance (row) in provided matrix of conditional probabilities estimates of labels :math:`\eta` (**y_proba**) according to the power law weighting scheme: .. math:: a = (\pi + \epsilon)^{-\beta} where :math:`\pi` (**priors**) is the prior probability of each label, :math:`\beta` (beta) is power parameter and :math:`\epsilon` (**epsilon**) is a small value to avoid division by zero. It is equivalent to calling ``predict_weighted_per_instance(y_proba, k=k, a=(priors + epsilon) ** -beta, return_meta=return_meta)``. Args: y_proba: A 2D matrix of conditional probabilities for each label. k: The number of labels to predict for each instance. priors: The prior probabilities for each label. beta: The power parameter. epsilon: A small value to avoid division by zero when calculating inverse of priors. keep_scores: Whether to keep the scores in the output prediction matrix instead of 1. dtype: The data type for the output matrix, if equal to None, the data type of **y_proba** will be used. return_meta: Whether to return meta data. Result The binary prediction matrix: with exactly **k** labels in each row, the shape and type of the matrix is the same as **y_proba**. If **return_meta** is True, additionally, a dictionary is returned, that contains the time taken to calculate the prediction. """ if priors.shape[0] != y_proba.shape[1]: raise ValueError("priors must be of shape (y_proba[1],)") weights = (priors + epsilon) ** -beta return predict_weighted_per_instance( y_proba, k=k, a=weights, keep_scores=keep_scores, dtype=dtype, return_meta=return_meta, return_weights=return_weights, )
[docs] def predict_optimizing_instance_precision( y_proba: Matrix, k: int, keep_scores: bool = False, dtype: Optional[DType] = None, return_meta: bool = False, ) -> Union[Matrix, Tuple[Matrix, dict]]: r""" Predicts the top **k** labels for each instance (row) in a provided matrix of conditional probabilities estimates of labels :math:`\eta` (**y_proba**). This is optimal inference strategy for precision at k and nDCG at k. Args: y_proba: A 2D matrix of conditional probabilities for each label. k: The number of labels to predict for each instance. priors: The prior probabilities for each label. beta: The power parameter. epsilon: A small value to avoid division by zero when calculating inverse of priors. keep_scores: Whether to keep the scores in the output prediction matrix instead of 1. dtype: The data type for the output matrix, if equal to None, the data type of **y_proba** will be used. return_meta: Whether to return meta data. Result The binary prediction matrix: with exactly **k** labels in each row, the shape and type of the matrix is the same as **y_proba**. If **return_meta** is True, additionally, a dictionary is returned, that contains the time taken to calculate the prediction. """ if k <= 0: raise ValueError("k must be > 0") return predict_top_k( y_proba, k=k, keep_scores=keep_scores, dtype=dtype, return_meta=return_meta )
[docs] def predict_optimizing_instance_propensity_scored_precision( y_proba: Matrix, k: int, inverse_propensities: Optional[DenseMatrix] = None, propensities: Optional[DenseMatrix] = None, keep_scores: bool = False, dtype: Optional[DType] = None, return_meta: bool = False, return_weights: bool = False, ) -> Union[Matrix, Tuple[Matrix, dict]]: r""" Predicts the top **k** labels for each instance (row) in provided matrix of conditional probabilities estimates of labels :math:`\eta` (**y_proba**) weighted by provided inverse propensity scores (**inverse_propensities**). """ if inverse_propensities is not None: if inverse_propensities.shape[0] != y_proba.shape[1]: raise ValueError("inverse_propensities must be of shape (y_proba[1],)") elif propensities is not None: if propensities.shape[0] != y_proba.shape[1]: raise ValueError("propensities must be of shape (y_proba[1],)") propensities[propensities == 0] = 1.0 inverse_propensities = 1.0 / propensities else: raise ValueError("either inverse_propensities or propensities must be provided") return predict_weighted_per_instance( y_proba, k=k, a=inverse_propensities, keep_scores=keep_scores, dtype=dtype, return_meta=return_meta, return_weights=return_weights, )