blob: 2870fabd836e4c12da84e5780515cf215987619c [file] [log] [blame]
# coding: utf-8
# pylint: disable=no-member
"""Online evaluation metric module."""
from __future__ import absolute_import
import math
import numpy
from . import ndarray
def check_label_shapes(labels, preds, shape=0):
"""Check to see if the two arrays are the same size."""
if shape == 0:
label_shape, pred_shape = len(labels), len(preds)
else:
label_shape, pred_shape = labels.shape, preds.shape
if label_shape != pred_shape:
raise ValueError("Shape of labels {} does not match shape of "
"predictions {}".format(label_shape, pred_shape))
class EvalMetric(object):
"""Base class of all evaluation metrics."""
def __init__(self, name, num=None):
self.name = name
self.num = num
self.reset()
def update(self, label, pred):
"""Update the internal evaluation.
Parameters
----------
labels : list of NDArray
The labels of the data.
preds : list of NDArray
Predicted values.
"""
raise NotImplementedError()
def reset(self):
"""Clear the internal statistics to initial state."""
if self.num is None:
self.num_inst = 0
self.sum_metric = 0.0
else:
self.num_inst = [0] * self.num
self.sum_metric = [0.0] * self.num
def get(self):
"""Get the current evaluation result.
Returns
-------
name : str
Name of the metric.
value : float
Value of the evaluation.
"""
if self.num is None:
if self.num_inst == 0:
return (self.name, float('nan'))
else:
return (self.name, self.sum_metric / self.num_inst)
else:
names = ['%s_%d'%(self.name, i) for i in range(self.num)]
values = [x / y if y != 0 else float('nan') \
for x, y in zip(self.sum_metric, self.num_inst)]
return (names, values)
def get_name_value(self):
"""Get zipped name and value pairs."""
name, value = self.get()
if not isinstance(name, list):
name = [name]
if not isinstance(value, list):
value = [value]
return zip(name, value)
def __str__(self):
return "EvalMetric: {}".format(dict(self.get_name_value()))
class CompositeEvalMetric(EvalMetric):
"""Manage multiple evaluation metrics."""
def __init__(self, **kwargs):
super(CompositeEvalMetric, self).__init__('composite')
try:
self.metrics = kwargs['metrics']
except KeyError:
self.metrics = []
def add(self, metric):
"""Add a child metric."""
self.metrics.append(metric)
def get_metric(self, index):
"""Get a child metric."""
try:
return self.metrics[index]
except IndexError:
return ValueError("Metric index {} is out of range 0 and {}".format(
index, len(self.metrics)))
def update(self, labels, preds):
for metric in self.metrics:
metric.update(labels, preds)
def reset(self):
try:
for metric in self.metrics:
metric.reset()
except AttributeError:
pass
def get(self):
names = []
results = []
for metric in self.metrics:
result = metric.get()
names.append(result[0])
results.append(result[1])
return (names, results)
########################
# CLASSIFICATION METRICS
########################
class Accuracy(EvalMetric):
"""Calculate accuracy."""
def __init__(self):
super(Accuracy, self).__init__('accuracy')
def update(self, labels, preds):
check_label_shapes(labels, preds)
for label, pred_label in zip(labels, preds):
if pred_label.shape != label.shape:
pred_label = ndarray.argmax_channel(pred_label)
pred_label = pred_label.asnumpy().astype('int32')
label = label.asnumpy().astype('int32')
check_label_shapes(label, pred_label)
self.sum_metric += (pred_label.flat == label.flat).sum()
self.num_inst += len(pred_label.flat)
class TopKAccuracy(EvalMetric):
"""Calculate top k predictions accuracy."""
def __init__(self, **kwargs):
super(TopKAccuracy, self).__init__('top_k_accuracy')
try:
self.top_k = kwargs['top_k']
except KeyError:
self.top_k = 1
assert(self.top_k > 1), 'Please use Accuracy if top_k is no more than 1'
self.name += '_%d' % self.top_k
def update(self, labels, preds):
check_label_shapes(labels, preds)
for label, pred_label in zip(labels, preds):
assert(len(pred_label.shape) <= 2), 'Predictions should be no more than 2 dims'
pred_label = numpy.argsort(pred_label.asnumpy().astype('float32'), axis=1)
label = label.asnumpy().astype('int32')
check_label_shapes(label, pred_label)
num_samples = pred_label.shape[0]
num_dims = len(pred_label.shape)
if num_dims == 1:
self.sum_metric += (pred_label.flat == label.flat).sum()
elif num_dims == 2:
num_classes = pred_label.shape[1]
top_k = min(num_classes, self.top_k)
for j in range(top_k):
self.sum_metric += (pred_label[:, num_classes - 1 - j].flat == label.flat).sum()
self.num_inst += num_samples
class F1(EvalMetric):
"""Calculate the F1 score of a binary classification problem."""
def __init__(self):
super(F1, self).__init__('f1')
def update(self, labels, preds):
check_label_shapes(labels, preds)
for label, pred in zip(labels, preds):
pred = pred.asnumpy()
label = label.asnumpy().astype('int32')
pred_label = numpy.argmax(pred, axis=1)
check_label_shapes(label, pred)
if len(numpy.unique(label)) > 2:
raise ValueError("F1 currently only supports binary classification.")
true_positives, false_positives, false_negatives = 0., 0., 0.
for y_pred, y_true in zip(pred_label, label):
if y_pred == 1 and y_true == 1:
true_positives += 1.
elif y_pred == 1 and y_true == 0:
false_positives += 1.
elif y_pred == 0 and y_true == 1:
false_negatives += 1.
if true_positives + false_positives > 0:
precision = true_positives / (true_positives + false_positives)
else:
precision = 0.
if true_positives + false_negatives > 0:
recall = true_positives / (true_positives + false_negatives)
else:
recall = 0.
if precision + recall > 0:
f1_score = 2 * precision * recall / (precision + recall)
else:
f1_score = 0.
self.sum_metric += f1_score
self.num_inst += 1
class Perplexity(EvalMetric):
"""Calculate perplexity.
Parameters
----------
ignore_label : int or None
Index of invalid label to ignore when
counting. Usually should be -1. Include
all entries if None.
axis : int (default -1)
The axis from prediction that was used to
compute softmax. By default use the last
axis.
"""
def __init__(self, ignore_label, axis=-1):
super(Perplexity, self).__init__('Perplexity')
self.ignore_label = ignore_label
self.axis = axis
def update(self, labels, preds):
assert len(labels) == len(preds)
loss = 0.
num = 0
for label, pred in zip(labels, preds):
assert label.size == pred.size/pred.shape[-1], \
"shape mismatch: %s vs. %s"%(label.shape, pred.shape)
label = label.as_in_context(pred.context).reshape((label.size,))
pred = ndarray.pick(pred, label.astype(dtype='int32'), axis=self.axis)
if self.ignore_label is not None:
ignore = label == self.ignore_label
num -= ndarray.sum(ignore).asscalar()
pred = pred*(1-ignore) + ignore
loss -= ndarray.sum(ndarray.log(ndarray.maximum(1e-10, pred))).asscalar()
num += pred.size
self.sum_metric += loss
self.num_inst += num
def get(self):
return (self.name, math.exp(self.sum_metric/self.num_inst))
####################
# REGRESSION METRICS
####################
class MAE(EvalMetric):
"""Calculate Mean Absolute Error (MAE) loss."""
def __init__(self):
super(MAE, self).__init__('mae')
def update(self, labels, preds):
check_label_shapes(labels, preds)
for label, pred in zip(labels, preds):
label = label.asnumpy()
pred = pred.asnumpy()
if len(label.shape) == 1:
label = label.reshape(label.shape[0], 1)
self.sum_metric += numpy.abs(label - pred).mean()
self.num_inst += 1 # numpy.prod(label.shape)
class MSE(EvalMetric):
"""Calculate Mean Squared Error (MSE) loss."""
def __init__(self):
super(MSE, self).__init__('mse')
def update(self, labels, preds):
check_label_shapes(labels, preds)
for label, pred in zip(labels, preds):
label = label.asnumpy()
pred = pred.asnumpy()
if len(label.shape) == 1:
label = label.reshape(label.shape[0], 1)
self.sum_metric += ((label - pred)**2.0).mean()
self.num_inst += 1 # numpy.prod(label.shape)
class RMSE(EvalMetric):
"""Calculate Root Mean Squred Error (RMSE) loss."""
def __init__(self):
super(RMSE, self).__init__('rmse')
def update(self, labels, preds):
check_label_shapes(labels, preds)
for label, pred in zip(labels, preds):
label = label.asnumpy()
pred = pred.asnumpy()
if len(label.shape) == 1:
label = label.reshape(label.shape[0], 1)
self.sum_metric += numpy.sqrt(((label - pred)**2.0).mean())
self.num_inst += 1
class CrossEntropy(EvalMetric):
"""Calculate Cross Entropy loss."""
def __init__(self, eps=1e-8):
super(CrossEntropy, self).__init__('cross-entropy')
self.eps = eps
def update(self, labels, preds):
check_label_shapes(labels, preds)
for label, pred in zip(labels, preds):
label = label.asnumpy()
pred = pred.asnumpy()
label = label.ravel()
assert label.shape[0] == pred.shape[0]
prob = pred[numpy.arange(label.shape[0]), numpy.int64(label)]
self.sum_metric += (-numpy.log(prob + self.eps)).sum()
self.num_inst += label.shape[0]
class Torch(EvalMetric):
"""Dummy metric for torch criterions."""
def __init__(self, name='torch'):
super(Torch, self).__init__(name)
def update(self, _, preds):
for pred in preds:
self.sum_metric += pred.asnumpy().mean()
self.num_inst += 1
class Caffe(Torch):
"""Dummy metric for caffe criterions"""
def __init__(self):
super(Caffe, self).__init__('caffe')
class CustomMetric(EvalMetric):
"""Custom evaluation metric that takes a NDArray function.
Parameters
----------
feval : callable(label, pred)
Customized evaluation function.
name : str, optional
The name of the metric.
allow_extra_outputs : bool
If true, the prediction outputs can have extra outputs.
This is useful in RNN, where the states are also produced
in outputs for forwarding.
"""
def __init__(self, feval, name=None, allow_extra_outputs=False):
if name is None:
name = feval.__name__
if name.find('<') != -1:
name = 'custom(%s)' % name
super(CustomMetric, self).__init__(name)
self._feval = feval
self._allow_extra_outputs = allow_extra_outputs
def update(self, labels, preds):
if not self._allow_extra_outputs:
check_label_shapes(labels, preds)
for pred, label in zip(preds, labels):
label = label.asnumpy()
pred = pred.asnumpy()
reval = self._feval(label, pred)
if isinstance(reval, tuple):
(sum_metric, num_inst) = reval
self.sum_metric += sum_metric
self.num_inst += num_inst
else:
self.sum_metric += reval
self.num_inst += 1
# pylint: disable=invalid-name
def np(numpy_feval, name=None, allow_extra_outputs=False):
"""Create a customized metric from numpy function.
Parameters
----------
numpy_feval : callable(label, pred)
Customized evaluation function.
This will get called with the labels and predictions
for a minibatch, each as NumPy arrays. This function
should return a single float.
name : str, optional
The name of the metric.
allow_extra_outputs : bool
If true, the prediction outputs can have extra outputs.
This is useful in RNN, where the states are also produced
in outputs for forwarding.
"""
def feval(label, pred):
"""Internal eval function."""
return numpy_feval(label, pred)
feval.__name__ = numpy_feval.__name__
return CustomMetric(feval, name, allow_extra_outputs)
# pylint: enable=invalid-name
def create(metric, **kwargs):
"""Create an evaluation metric.
Parameters
----------
metric : str or callable
The name of the metric, or a function
providing statistics given pred, label NDArray.
"""
if callable(metric):
return CustomMetric(metric)
elif isinstance(metric, EvalMetric):
return metric
elif isinstance(metric, list):
composite_metric = CompositeEvalMetric()
for child_metric in metric:
composite_metric.add(create(child_metric, **kwargs))
return composite_metric
metrics = {
'acc': Accuracy,
'accuracy': Accuracy,
'ce': CrossEntropy,
'f1': F1,
'mae': MAE,
'mse': MSE,
'rmse': RMSE,
'top_k_accuracy': TopKAccuracy
}
try:
return metrics[metric.lower()](**kwargs)
except:
raise ValueError("Metric must be either callable or in {}".format(
metrics.keys()))