blob: 1d7bdc6e98939003cd9fcb3557a3fbe260802303 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.metron.statistics;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.tdunning.math.stats.AVLTreeDigest;
import com.tdunning.math.stats.TDigest;
import org.apache.commons.math3.util.FastMath;
import java.nio.ByteBuffer;
* A (near) constant memory implementation of a statistics provider.
* For first order statistics, simple terms are stored and composed
* to return the statistics results. This is intended to provide a
* mergeable implementation for a statistics provider.
public class OnlineStatisticsProvider implements StatisticsProvider, KryoSerializable {
* A sensible default for compression to use in the T-Digest.
* As per
* 100 is a sensible default and the number of centroids retained (to construct the sketch)
* is usually a smallish (usually less than 10) multiple of the compression. However, we have found through some
* testing that 150 gives a bit finer granularity for smaller numbers of data points.
public static final int COMPRESSION = 150;
* A distributional sketch that uses a variant of 1-D k-means to construct a tree of ranges
* that sketches the distribution. See for
* more detail.
private TDigest digest;
private long n = 0;
private double sum = 0;
private double sumOfSquares = 0;
private double sumOfLogs = 0;
private Double min = null;
private Double max = null;
//\mu_1, E[X]
private double M1 = 0;
//\mu_2: E[(X - \mu)^2]
private double M2 = 0;
//\mu_3: E[(X - \mu)^3]
private double M3 = 0;
//\mu_4: E[(X - \mu)^4]
private double M4 = 0;
public OnlineStatisticsProvider() {
digest = new AVLTreeDigest(COMPRESSION);
* Add a value.
* NOTE: This does not store the point, but only updates internal state.
* NOTE: This is NOT threadsafe.
* @param value
public void addValue(double value) {
long n1 = n;
min = min == null?value:Math.min(min, value);
max = max == null?value:Math.max(max, value);
sum += value;
sumOfLogs += Math.log(value);
sumOfSquares += value*value;
double delta, delta_n, delta_n2, term1;
//delta between the value and the mean
delta = value - M1;
//(x - E[x])/n
delta_n = delta / n;
delta_n2 = delta_n * delta_n;
term1 = delta * delta_n * n1;
// Adjusting expected value: See Knuth TAOCP vol 2, 3rd edition, page 232
M1 += delta_n;
// Adjusting the \mu_i, see
M4 += term1 * delta_n2 * (n*n - 3*n + 3) + 6 * delta_n2 * M2 - 4 * delta_n * M3;
M3 += term1 * delta_n * (n - 2) - 3 * delta_n * M2;
M2 += term1;
checkFlowError(sumOfSquares, sum, sumOfSquares, M1, M2, M3, M4);
private void checkFlowError(double sumOfSquares, double sum, double... vals) {
for(double val : vals) {
if(Double.isInfinite(val)) {
throw new IllegalStateException("Double overflow!");
//underflow. It is sufficient to check sumOfSquares because sumOfSquares is going to converge to 0 faster than sum
//in the situation where we're looking at an underflow.
if(sumOfSquares == 0.0 && sum > 0) {
throw new IllegalStateException("Double overflow!");
public long getCount() {
return n;
public double getMin() {
return min == null?Double.NaN:min;
public double getMax() {
return max == null?Double.NaN:max;
public double getMean() {
return getSum()/getCount();
public double getSum() {
return sum;
public double getVariance() {
return M2/(n - 1.0);
public double getStandardDeviation() {
return FastMath.sqrt(getVariance());
public double getGeometricMean() {
throw new UnsupportedOperationException("Unwilling to compute the geometric mean.");
public double getPopulationVariance() {
throw new UnsupportedOperationException("Unwilling to compute the geometric mean.");
public double getQuadraticMean() {
return FastMath.sqrt(sumOfSquares/n);
public double getSumLogs() {
return sumOfLogs;
public double getSumSquares() {
return sumOfSquares;
* Unbiased kurtosis.
* See
* @return unbiased kurtosis
public double getKurtosis() {
//kurtosis = { [n(n+1) / (n -1)(n - 2)(n-3)] \mu_4 / std^4 } - [3(n-1)^2 / (n-2)(n-3)]
if(n < 4) {
return Double.NaN;
double std = getStandardDeviation();
double t1 = (1.0*n)*(n+1)/((n-1)*(n-2)*(n-3));
double t3 = 3.0*((n-1)*(n-1))/((n-2)*(n-3));
return t1*(M4/FastMath.pow(std, 4))-t3;
* Unbiased skewness.
* See
* @return unbiased skewness
public double getSkewness() {
// skewness = [n / (n -1) (n - 2)] sum[(x_i - mean)^3] / std^3
if(n < 3) {
return Double.NaN;
double t1 = (1.0*n)/((n - 1)*(n-2));
double std = getStandardDeviation();
return t1*M3/FastMath.pow(std, 3);
* This returns an approximate percentile based on a t-digest.
* @param p
* @return an approximate percentile based on a t-digest
public double getPercentile(double p) {
return digest.quantile(p/100.0);
public StatisticsProvider merge(StatisticsProvider provider) {
OnlineStatisticsProvider combined = new OnlineStatisticsProvider();
OnlineStatisticsProvider a = this;
OnlineStatisticsProvider b = (OnlineStatisticsProvider)provider;
//Combining the simple terms that obviously form a semigroup
combined.n = a.n + b.n;
combined.sum = a.sum + b.sum;
if(a.min != null && b.min != null) {
combined.min = Math.min(a.min, b.min);
combined.max = Math.max(a.max, b.max);
else {
combined.min = a.min;
combined.max = a.max;
combined.sumOfSquares = a.sumOfSquares + b.sumOfSquares;
combined.sumOfLogs = a.sumOfLogs+ b.sumOfLogs;
// Adjusting the standardized moments, see
double delta = b.M1 - a.M1;
double delta2 = delta*delta;
double delta3 = delta*delta2;
double delta4 = delta2*delta2;
combined.M1 = (a.n*a.M1 + b.n*b.M1) / combined.n;
combined.M2 = a.M2 + b.M2 +
delta2 * a.n * b.n / combined.n;
combined.M3 = a.M3 + b.M3 +
delta3 * a.n * b.n * (a.n - b.n)/(combined.n*combined.n);
combined.M3 += 3.0*delta * (a.n*b.M2 - b.n*a.M2) / combined.n;
combined.M4 = a.M4 + b.M4 + delta4*a.n*b.n * (a.n*a.n - a.n*b.n + b.n*b.n) /
combined.M4 += 6.0*delta2 * (a.n*a.n*b.M2 + b.n*b.n*a.M2)/(combined.n*combined.n) +
4.0*delta*(a.n*b.M3 - b.n*a.M3) / combined.n;
//Merging the distributional sketches
checkFlowError(combined.sumOfSquares, sum, combined.sumOfSquares, combined.M1, combined.M2, combined.M3, combined.M4);
return combined;
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OnlineStatisticsProvider that = (OnlineStatisticsProvider) o;
if (n != that.n) return false;
if (, sum) != 0) return false;
if (, sumOfSquares) != 0) return false;
if (, sumOfLogs) != 0) return false;
if (, M1) != 0) return false;
if (, M2) != 0) return false;
if (, M3) != 0) return false;
if (, M4) != 0) return false;
if (digest != null ? !digest.equals(that.digest) : that.digest != null) return false;
if (min != null ? !min.equals(that.min) : that.min != null) return false;
return max != null ? max.equals(that.max) : that.max == null;
public int hashCode() {
int result;
long temp;
result = digest != null ? digest.hashCode() : 0;
result = 31 * result + (int) (n ^ (n >>> 32));
temp = Double.doubleToLongBits(sum);
result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = Double.doubleToLongBits(sumOfSquares);
result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = Double.doubleToLongBits(sumOfLogs);
result = 31 * result + (int) (temp ^ (temp >>> 32));
result = 31 * result + (min != null ? min.hashCode() : 0);
result = 31 * result + (max != null ? max.hashCode() : 0);
temp = Double.doubleToLongBits(M1);
result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = Double.doubleToLongBits(M2);
result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = Double.doubleToLongBits(M3);
result = 31 * result + (int) (temp ^ (temp >>> 32));
temp = Double.doubleToLongBits(M4);
result = 31 * result + (int) (temp ^ (temp >>> 32));
return result;
public void write(Kryo kryo, Output output) {
//storing tdigest
ByteBuffer outBuffer = ByteBuffer.allocate(digest.byteSize());
byte[] tdigestSerialized = outBuffer.array();
public void read(Kryo kryo, Input input) {
int digestSize = input.readInt();
byte[] digestBytes = input.readBytes(digestSize);
ByteBuffer digestBuff = ByteBuffer.wrap(digestBytes);
digest = AVLTreeDigest.fromBytes(digestBuff);
n = input.readLong();
sum = input.readDouble();
sumOfSquares = input.readDouble();
sumOfLogs = input.readDouble();
min = input.readDouble();
max = input.readDouble();
M1 = input.readDouble();
M2 = input.readDouble();
M3 = input.readDouble();
M4 = input.readDouble();