Source code for MRCpy.amrc

"""
Adaptative Minimax Risk Classification. Copyright (C) 2022 Veronica Alvarez

This program is free software: you can redistribute it and/or modify it under the terms of the 
GNU General Public License as published by the Free Software Foundation,
either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program.
If not, see https://www.gnu.org/licenses/.
"""

import itertools
import math

import numpy as np
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

# Import the AMRC super class
from MRCpy import BaseMRC
from MRCpy.phi import BasePhi, RandomFourierPhi


[docs]class AMRC(BaseMRC): ''' Adaptative Minimax Risk Classifier The class AMRC implements the method Adaptative Minimimax Risk Classificafiers (AMRCs) proposed in :ref:`[1] <ref1>`. It is designed for online learning with streaming data. Training samples are fed sequentially and the classification rule is updated every time a new sample is provided. AMRC provides adaptation to concept drift (change in the underlying distribution of the data). Such concept drift is common in multiple applications including electricity price prediction, spam mail filtering, and credit card fraud detection. AMRC accounts for multidimensional time changes by means of a multivariate and high-order tracking of the time-varying underlying distribution. In addition, differently from conventional techniques, AMRCs can provide computable tight performance guarantees at learning. It implements 0-1 loss function and it can be used with linear and Random Fourier features. .. seealso:: For more information about AMRC, one can refer to the following paper: [1] `Álvarez, V., Mazuelas, S., & Lozano, J. A. (2022). Minimax Classification under Concept Drift with Multidimensional Adaptation and Performance Guarantees. International Conference on Machine Learning (ICML) 2022. @InProceedings{AlvMazLoz22, title = {Minimax Classification under Concept Drift with Multidimensional Adaptation and Performance Guarantees}, author = {{\'A}lvarez, Ver{\'o}nica and Mazuelas, Santiago and Lozano, Jose A}, booktitle = {Proceedings of the 39th International Conference on Machine Learning}, pages = {486--499}, year = {2022}, volume = {162}, series = {Proceedings of Machine Learning Research}, month = {Jul}, publisher = {PMLR}, } Parameters ---------- n_classes : `int` Number of different possible labels for an instance. deterministic : `bool`, default = `True` Whether the prediction of the labels should be done in a deterministic way (given a fixed `random_state` in the case of using random Fourier or random ReLU features). loss : `str` {'0-1'}, default = '0-1' Type of loss function to use for the risk minimization. AMRC supports 0-1 loss. 0-1 loss quantifies the probability of classification error at a certain example for a certain rule. unidimensional : `bool`, default = False Whether to model change in the variables unidimensionally or not. Available for comparison purposes. delta : `float`, default = 0.05 Significance of the upper bound on the accumulated mistakes. Lower values will produce higher values for bounds. order : `int`, default = 1 Order of the subgradients used in optimization. W : `int`, default = 200 Window size. The model uses the last `W` samples for fitting the model. N : `int`, default = 100 Number of subgradients used for optimization. max_iters : `int`, default = `2000` Maximum number of iterations to use for finding the solution of optimization in the subgradient approach. phi : `str` or `BasePhi` instance, default = 'linear' Type of feature mapping function to use for mapping the input data. The currenlty available feature mapping methods are 'fourier', 'relu', 'threshold' and 'linear'. The users can also implement their own feature mapping object (should be a `BasePhi` instance) and pass it to this argument. Note that when using 'fourier' feature mapping, training and testing instances are expected to be normalized. To implement a feature mapping, please go through the :ref:`Feature Mapping` section. 'linear' It uses the identity feature map referred to as Linear feature map. See class `BasePhi`. 'fourier' It uses Random Fourier Feature map. See class `RandomFourierPhi`. random_state : `int`, RandomState instance, default = `None` Random seed used when using 'fourier' for feature mappings to produce the random weights. fit_intercept : `bool`, default = `True` Whether to calculate the intercept for MRCs If set to false, no intercept will be used in calculations (i.e. data is expected to be already centered). **phi_kwargs : Additional parameters for feature mappings. Groups the multiple optional parameters for the corresponding feature mappings(`phi`). For example in case of fourier features, the number of features is given by `n_components` parameter which can be passed as argument `AMRC(phi='fourier', n_components=500)` The list of arguments for each feature mappings class can be found in the corresponding documentation. '''
[docs] def __init__(self, n_classes, loss='0-1', deterministic=True, random_state=None, phi='linear', unidimensional=False, delta = 0.05, order=1, W=200, N=100, fit_intercept=False, max_iters=2000, **phi_kwargs): self.n_classes = n_classes self.unidimensional = unidimensional self.order = order self.delta = delta if self.unidimensional: self.order = 0 self.W = W self.N = N self.max_iters = max_iters self.Y = np.zeros(self.W) self.p = np.zeros((self.n_classes, self.W)) self.sample_counter = 0 self.params_ = {} if 'one_hot' in phi_kwargs and not phi_kwargs['one_hot']: raise ValueError('AMRC does not support one_hot=False') else: phi_kwargs['one_hot'] = True if loss != '0-1': raise ValueError('AMRC only support loss=0-1') super().__init__(loss=loss, deterministic=deterministic, random_state=random_state, fit_intercept=fit_intercept, phi=phi, **phi_kwargs)
def tracking(self, feature, y, p, s): ''' Tracking uncertainty sets This function obtains mean vector estimates and confidence vectors Input ----- feature: feature vector y: new label p: `float` Probability s: `float` Standard deviation Output ------ tau_: mean vector estimate lambda_: confidence vector params_: `dict` Optimization parameters eta: updated mean vector estimate Sigma: updated mean quadratic error matrix eta0, Sigma0, epsilon: parameters required to update variances of noise processes Q, R: variances of noise processes ''' Ht = self.params_['Ht'] eta = self.params_['eta'] Sigma = self.params_['Sigma'] eta0 = self.params_['eta0'] Sigma0 = self.params_['Sigma0'] epsilon = self.params_['epsilon'] Q = self.params_['Q'] R = self.params_['R'] e1 = np.zeros((1, self.order + 1)) e1[0, 0] = 1 m = len(feature[0]) n_classes = len(p) d = m / n_classes alpha = 0.3 tau_ = np.zeros((m, 1)) lambda_ = np.zeros((m, 1)) if self.unidimensional: KK = np.zeros((m, 1)) for i in range(m): innovation = feature[0, i] - eta[0, i] aa = alpha * R[i, 0] + (1 - alpha) * \ (np.dot(epsilon[i], epsilon[i]) + np.dot(np.dot(e1, Sigma[i, :, :]), np.transpose(e1))) R[i] = aa[0] a = (np.dot(Sigma[i, :, :], np.transpose(e1))) b = np.dot(np.dot(e1, Sigma[i, :, :]), np.transpose(e1)) + \ R[i, :] KK[i] = a / b K = np.mean(KK) for i in range(m): eta0[:, i] = eta[:, i] + K * innovation Sigma0[i, :, :] = np.dot((np.identity(self.order + 1) - np.dot(K, e1)), Sigma[i, :, :]) Q[i, :, :] = alpha * Q[i, :, :] + (1 - alpha) * \ np.dot(innovation * innovation * K, np.transpose(K)) epsilon[i] = feature[0, i] - eta0[0, i] eta[:, i] = np.dot(Ht, eta0[:, i]) Sigma[i, :, :] = np.dot(np.dot(Ht, Sigma0[i, :, :]), np.transpose(Ht)) + Q[i, :, :] tau_[i, 0] = (1 / n_classes) * eta[0, i] lmb_eta = np.sqrt(Sigma[i, 0, 0]) lambda_[i, 0] = np.mean(lmb_eta) elif not self.unidimensional: for i in range(m): if i > y * d - 1 and i < (y + 1) * d + 1: innovation = feature[0, i] - eta[0, i] aa = alpha * R[i, 0] + (1 - alpha) * \ (np.dot(epsilon[i], epsilon[i]) + np.dot(np.dot(e1, Sigma[i, :, :]), np.transpose(e1))) R[i] = aa[0] a = (np.dot(Sigma[i, :, :], np.transpose(e1))) b = np.dot(np.dot(e1, Sigma[i, :, :]), np.transpose(e1)) + R[i, :] K = a / b eta0[:, i] = eta[:, i] + np.transpose(K[:] * innovation) Sigma0[i, :, :] = np.dot((np.identity(self.order + 1) - np.dot(K, e1)), Sigma[i, :, :]) Q[i, :, :] = alpha * Q[i, :, :] + (1 - alpha) * \ np.dot(innovation * innovation * K, np.transpose(K)) epsilon[i] = feature[0, i] - eta0[0, i] eta[:, i] = np.dot(Ht, eta0[:, i]) Sigma[i, :, :] = np.dot(np.dot(Ht, Sigma0[i, :, :]), np.transpose(Ht)) + Q[i, :, :] tau_[i, 0] = p[y] * eta[0, i] lmb_eta = np.sqrt(Sigma[i, 0, 0]) lambda_[i, 0] = np.sqrt((lmb_eta ** 2 + eta[0, i] ** 2) * (s[y] ** 2 + p[y] ** 2) - ((eta[0, i]) ** 2) * (p[y] ** 2)) else: eta[:, i] = np.dot(Ht, eta0[:, i]) Sigma[i, :, :] = np.dot(np.dot(Ht, Sigma0[i, :, :]), np.transpose(Ht)) + Q[i, :, :] tau_[i, 0] = (p[int((i) / d)]) * eta[0, i] lmb_eta = np.sqrt(Sigma[i, 0, 0]) lambda_[i, 0] = np.sqrt((lmb_eta ** 2 + eta[0, i] ** 2) * (s[int((i) / d)] ** 2 + p[int((i) / d)] ** 2) - ((eta[0, i]) ** 2) * (p[int((i) / d)] ** 2)) else: raise ValueError('The given value for parameter unidimensional ' + 'is not a boolean') params_ = {'Ht': Ht, 'eta': eta, 'Sigma': Sigma, 'eta0': eta0, 'Sigma0': Sigma0, 'epsilon': epsilon, 'Q': Q, 'R': R} return tau_, lambda_, params_ def initialize_tracking(self, m): ''' Initialize tracking stage This function initializes mean vector estimates, confidence vectors, and defines matrices and vectors that are used to update mean vector estimates and confidence vectors. Attributes ---------- m: length of mean vector estimate Output ------ params_: `dict` Optimization parameters Ht: transition matrix e1: vector with 1 in the first component and 0 in the remainning components eta: state vectors Sigma: mean squared error matrices eta0, Sigma0, epsilon: parameters required to obtain variances of noise processes Q, R: variances of noise processes ''' e1 = np.ones((1, self.order + 1)) for i in range(1, self.order + 1): e1[0, i] = 0 deltat = 1 variance_init = 0.001 Ht = np.identity(self.order + 1) for i in range(self.order): for j in range(i + 1, self.order + 1): Ht[i, j] = pow(deltat, j - i) / math.factorial(j - i) eta0 = np.zeros((self.order + 1, m)) eta = np.zeros((self.order + 1, m)) Sigma0 = np.zeros((m, self.order + 1, self.order + 1)) Sigma = np.zeros((m, self.order + 1, self.order + 1)) Q = np.zeros((m, self.order + 1, self.order + 1)) R = np.zeros((m, 1)) epsilon = np.zeros((m, 1)) for i in range(m): for j in range(self.order + 1): Sigma0[i, j, j] = 1 Q[i, j, j] = variance_init R[i] = variance_init epsilon[i] = - eta0[0, i] eta[:, i] = np.dot(Ht, eta0[:, i]) Sigma[i, :, :] = np.dot(np.dot(Ht, Sigma0[i, :, :]), np.transpose(Ht)) + Q[i, :, :] params_ = {'Ht': Ht, 'eta': eta, 'Sigma': Sigma, 'eta0': eta0, 'Sigma0': Sigma0, 'epsilon': epsilon, 'Q': Q, 'R': R} return params_
[docs] def minimax_risk(self, x, tau_, lambda_, n_classes): ''' Learning This function efficiently learns classifier parameters Input ----- X : `array`-like Training instances used for solving the minimax risk optimization problem. tau_ : `array`-like Mean estimates for the expectations of feature mappings. lambda_ : `array`-like Variance in the mean estimates for the expectations of the feature mappings. n_classes : `int` Number of labels in the dataset. Output ------ self : Fitted estimator ''' self.n_classes = n_classes self.tau_ = check_array(tau_, accept_sparse=True, ensure_2d=False) self.lambda_ = check_array(lambda_, accept_sparse=True, ensure_2d=False) F = self.params_['F'] h = self.params_['h'] theta = 1 theta0 = 1 muaux = self.mu R_Ut = 0 M = np.zeros((n_classes, len(self.mu))) for j in range(n_classes): M[j, :] = self.phi.eval_xy(x.reshape((1, -1)), [j]) for j in range(n_classes): aux = list(itertools.combinations([*range(n_classes)], j + 1)) for k in range(np.size(aux, 0)): idx = np.zeros((1, n_classes)) a = aux[k] for mm in range(len(a)): idx[0, a[mm]] = 1 a = (np.dot(idx, M)) / (j + 1) b = np.size(F, 0) F2 = np.zeros((b + 1, len(self.mu))) h2 = np.zeros((b + 1, 1)) for mm in range(b): for jj in range(len(self.mu)): F2[mm, jj] = F[mm, jj] h2[mm, :] = h[mm, :] F2[-1, :] = a b = -1 / (j + 1) h2[-1, 0] = b F = F2 h = h2 if self.sample_counter == 0: F = np.delete(F, 0, 0) h = np.delete(h, 0, 0) v = np.dot(F, muaux) + h varphi = max(v)[0] regularization = sum(self.lambda_ * abs(muaux)) R_Ut_best_value = 1 - np.dot(np.transpose(self.tau_), muaux)[0] + \ varphi + regularization F_count = np.zeros((len(F[:, 0]), 1)) for i in range(self.max_iters): muaux = self.params_['w'] + theta * ((1 / theta0) - 1) * \ (self.params_['w'] - self.params_['w0']) v = np.dot(F, muaux) + h varphi = max(v)[0] idx_mv = np.where(v == varphi) if len(idx_mv[0]) > 1: fi = F[[idx_mv[0][0]], :] F_count[[idx_mv[0][0]]] = F_count[[idx_mv[0][0]]] + 1 else: fi = F[idx_mv[0], :] F_count[idx_mv[0]] = F_count[idx_mv[0]] + 1 subgradient_regularization = np.multiply(self.lambda_, np.sign(muaux)) regularization = np.sum(np.multiply(self.lambda_, np.abs(muaux))) g = - self.tau_ + np.transpose(fi) + subgradient_regularization theta0 = theta theta = 2 / (i + 2) alpha = 1 / ((i + 2) ** (3 / 2)) self.params_['w0'] = self.params_['w'] self.params_['w'] = muaux - np.multiply(alpha, g) R_Ut = 1 - np.dot(np.transpose(self.tau_), muaux)[0] + \ varphi + regularization if R_Ut < R_Ut_best_value: R_Ut_best_value = R_Ut self.mu = muaux v = np.dot(F, muaux) + h varphi = max(v)[0] regularization = np.sum(np.multiply(self.lambda_, np.abs(self.params_['w']))) R_Ut = 1 - np.dot(np.transpose(self.tau_), self.params_['w'])[0] + \ varphi + regularization if R_Ut < R_Ut_best_value: R_Ut_best_value = R_Ut self.mu = self.params_['w'] if len(F[:, 0]) > self.N: idx_F_count = np.where(F_count == 0) if len(idx_F_count) > len(F[:, 0]) - self.N: for j in range(len(idx_F_count[0]) - self.N): t = len(idx_F_count[0]) - 1 - j F = np.delete(F, idx_F_count[0][t], idx_F_count[1][t]) h = np.delete(h, idx_F_count[0][t], 0) else: for j in range(len(idx_F_count[0]) - len(F[:, 0]) + self.N): t = len(idx_F_count[0]) - 1 - j F = np.delete(F, idx_F_count[0][t], idx_F_count[1][t]) h = np.delete(h, idx_F_count[0][t], 0) self.is_fitted_ = True self.varphi = varphi self.params_['R_Ut'] = R_Ut self.params_['sum_R_Ut'] = self.params_['sum_R_Ut'] + R_Ut self.params_['F'] = F self.params_['h'] = h return self
[docs] def fit(self, x, y, X_=None): ''' Fit the AMRC model. Computes the parameters required for the minimax risk optimization and then calls the `minimax_risk` function to solve the optimization. Parameters ---------- X : `array`-like of shape (`n_features`) Training instances used in - Calculating the expectation estimates that constrain the uncertainty set for the minimax risk classification - Solving the minimax risk optimization problem. Y : `int`, default = `None` Label corresponding to the training instance used only to compute the expectation estimates. X_ : None Unused in AMRC Returns ------- self : Fitted estimator ''' x = check_array(x, accept_sparse=True, ensure_2d=False) # Calculate the length m of the feature vector if self.sample_counter == 0: if self.phi == 'linear': self.phi = BasePhi(n_classes=self.n_classes, fit_intercept=self.fit_intercept, **self.phi_kwargs) elif self.phi == 'fourier': self.phi = RandomFourierPhi(n_classes=self.n_classes, fit_intercept=self.fit_intercept, random_state=self.random_state, **self.phi_kwargs) elif not isinstance(self.phi, BasePhi): raise ValueError('Unexpected feature mapping type ... ') # Each time we call phi.fit the random weights in random fourier # featurees change self.phi.fit(x.reshape((1, -1)), [y]) m = self.phi.len_ # Initialize classifier parameter, upper bounds vector, and matrix # and vector that are used to obtain local approximations of # varphi function self.params_['F'] = np.zeros((1, m)) self.params_['h'] = np.zeros((1, 1)) self.mu = np.zeros((m, 1)) self.params_['w'] = np.zeros((m, 1)) self.params_['w0'] = np.zeros((m, 1)) self.params_['R_Ut'] = 0 self.params_['sum_R_Ut'] = 0 # Initialize mean vector estimate params_ = self.initialize_tracking(m) self.params_ = {**self.params_, **params_} # Estimating probabilities sample_idx = self.sample_counter % self.W self.Y[sample_idx] = y end = min(self.sample_counter + 1, self.W) s = np.zeros(self.n_classes) for i in range(self.n_classes): self.p[i, sample_idx] = \ np.mean(self.Y[:end] == i) if end > 1: s[i] = np.std(self.p[i, :end]) * \ np.sqrt(end) / np.sqrt(end - 1) else: s[i] = 0 # Feature vector feature = self.phi.eval_xy(x.reshape((1, -1)), [y]) # Update mean vector estimate and confidence vector tau_, lambda_, params_ = \ self.tracking(feature, y, self.p[:, sample_idx], s) self.params_ = {**self.params_, **params_} # Update classifier parameter and obtain upper bound self.minimax_risk(x, tau_, lambda_, self.n_classes) self.sample_counter = self.sample_counter + 1 self.is_fitted_ = True return self
[docs] def predict(self, X): ''' Predicts classes for new instances using a fitted model. Returns the predicted classes for the given instances in `X` using the probabilities given by the function `predict_proba`. Parameters ---------- X : `array`-like of shape (`n_features`) Test instance for to predict by the AMRC model. Returns ------- y_pred : `int` Predicted labels corresponding to the given instances. ''' X = check_array(X, accept_sparse=True, ensure_2d=False) check_is_fitted(self, "is_fitted_") # Get the prediction probabilities for the classifier proba = self.predict_proba(X) if self.deterministic: y_pred = np.argmax(proba) else: y_pred = np.random.choice(self.n_classes, p=proba) # Check if the labels were provided for fitting # (labels might be omitted if fitting is done through minimax_risk) # Otherwise return the default labels i.e., from 0 to n_classes-1. if hasattr(self, 'classes_'): y_pred = np.asarray([self.classes_[label] for label in y_pred]) return y_pred
[docs] def predict_proba(self, x): ''' Conditional probabilities corresponding to each class for each unlabeled input instance Parameters ---------- x : `array`-like of shape (`n_dimensions`) Testing instance for which the prediction probabilities are calculated for each class. Returns ------- h : `ndarray` of shape (`n_classes`) Probabilities :math:`(p(y|x))` corresponding to the predictions for each class. ''' M = np.zeros((self.n_classes, len(self.mu))) c = np.zeros(self.n_classes) for j in range(self.n_classes): M[j, :] = self.phi.eval_xy(x.reshape((1, -1)), [j]) for j in range(self.n_classes): c[j] = max([np.dot(M[j, :], self.mu)[0] - self.varphi, 0]) cx = sum(c) h = np.zeros(self.n_classes) for j in range(self.n_classes): if cx == 0: h[j] = 1 / self.n_classes else: h[j] = c[j] / cx return h
[docs] def get_upper_bound(self): ''' Returns the upper bound on the expected loss for the fitted classifier. Returns ------- upper_bound : `float` Upper bound of the expected loss for the fitted classifier. ''' return self.params_['R_Ut']
[docs] def get_upper_bound_accumulated(self): ''' Returns the upper bound on the accumulated mistakes of the fitted classifier. Returns ------- upper_bound_accumulated : `float` Upper bound of the accumulated for the fitted classifier. ''' return ((self.params_['sum_R_Ut'] + \ np.sqrt(2 * self.sample_counter * np.log(1 / self.delta))) / self.sample_counter)