blob: 58fcff2760460cd65af6c094067aeeeafa0e92d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.microprofile.metrics.common;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toMap;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.json.bind.annotation.JsonbTransient;
import org.eclipse.microprofile.metrics.Histogram;
import org.eclipse.microprofile.metrics.Snapshot;
// impl adapted from apache sirona
public class HistogramImpl implements Histogram {
// potential config
private static final double ALPHA = Double.parseDouble(System.getProperty("geronimo.metrics.storage.alpha", "0.015"));
private static final int BUCKET_SIZE = Integer.getInteger("geronimo.metrics.storage.size", 1024);
private static final long REFRESH_INTERVAL = TimeUnit.HOURS.toNanos(1);
private static final Value[] EMPTY_ARRAY = new Value[0];
private final String unit;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AtomicLong count = new AtomicLong();
private final ConcurrentSkipListMap<Double, Value> bucket = new ConcurrentSkipListMap<>();
private final AtomicLong nextRefreshTime = new AtomicLong(System.nanoTime() + REFRESH_INTERVAL);
private volatile long startTime = nowSec();
public HistogramImpl(final String unit) {
this.unit = unit;
}
@Override
public void update(final int value) {
update((long) value);
}
@Override
public synchronized void update(final long value) {
add(value);
}
@Override
public long getCount() {
return count.get();
}
@Override
@JsonbTransient
public Snapshot getSnapshot() {
return snapshot();
}
public String getUnit() {
return unit;
}
public double getP50() {
return getSnapshot().getMedian();
}
public double getP75() {
return getSnapshot().get75thPercentile();
}
public double getP95() {
return getSnapshot().get95thPercentile();
}
public double getP98() {
return getSnapshot().get98thPercentile();
}
public double getP99() {
return getSnapshot().get99thPercentile();
}
public double getP999() {
return getSnapshot().get999thPercentile();
}
public long getMax() {
return getSnapshot().getMax();
}
public double getMean() {
return getSnapshot().getMean();
}
public long getMin() {
return getSnapshot().getMin();
}
public double getStddev() {
return getSnapshot().getStdDev();
}
private long nowSec() {
return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
}
public void add(final long value) {
ensureUpToDate();
final Lock lock = this.lock.readLock();
lock.lock();
try {
final Value sample = new Value(value, Math.exp(ALPHA * (nowSec() - startTime)));
final double priority = sample.weight / Math.random();
final long size = count.incrementAndGet();
if (size <= BUCKET_SIZE) {
bucket.put(priority, sample);
} else { // iterate through the bucket until we need removing low priority entries to get a new space
double first = bucket.firstKey();
if (first < priority && bucket.putIfAbsent(priority, sample) == null) {
while (bucket.remove(first) == null) {
first = bucket.firstKey();
}
}
}
} finally {
lock.unlock();
}
}
private void ensureUpToDate() {
final long next = nextRefreshTime.get();
final long now = System.nanoTime();
if (now < next) {
return;
}
final Lock lock = this.lock.writeLock();
lock.lock();
try {
if (nextRefreshTime.compareAndSet(next, now + REFRESH_INTERVAL)) {
final long oldStartTime = startTime;
startTime = nowSec();
final double updateFactor = Math.exp(-ALPHA * (startTime - oldStartTime));
if (updateFactor != 0.) {
bucket.putAll(new ArrayList<>(bucket.keySet()).stream()
.collect(toMap(k -> k * updateFactor, k -> {
final Value previous = bucket.remove(k);
return new Value(previous.value, previous.weight * updateFactor);
})));
count.set(bucket.size()); // N keys can lead to the same key so we must update it
} else {
bucket.clear();
count.set(0);
}
}
} finally {
lock.unlock();
}
}
public Snapshot snapshot() {
ensureUpToDate();
final Lock lock = this.lock.readLock();
lock.lock();
try {
return new SnapshotImpl(bucket.values().toArray(EMPTY_ARRAY));
} finally {
lock.unlock();
}
}
private static final class Value {
private final long value;
private final double weight;
private Value(final long value, final double weight) {
this.value = value;
this.weight= weight;
}
}
private static class SnapshotImpl extends Snapshot {
private final Value[] values;
private Value[] sorted;
private SnapshotImpl(final Value[] values) {
this.values = values;
// no high computation here, we are under lock + all methods are not called in general
}
@Override
public int size() {
return values.length;
}
@Override
public long[] getValues() {
return values(sorted()).toArray();
}
@Override
public long getMax() {
if (values.length == 0) {
return 0;
}
if (sorted != null) {
return sorted[sorted.length - 1].value;
}
return values(values).max().orElse(0);
}
@Override
public long getMin() {
if (values.length == 0) {
return 0;
}
if (sorted != null) {
return sorted[0].value;
}
return values(values).min().orElse(0);
}
@Override
public double getMean() {
if (values.length == 0) {
return 0;
}
return values(values).sum() * 1. / values.length;
}
@Override
public double getStdDev() {
if (values.length <= 1) {
return 0;
}
final double mean = getMean();
final double sumWeight = Stream.of(values).mapToDouble(i -> i.weight).sum();
return Math.sqrt(Stream.of(values)
.mapToDouble(v -> Math.pow(v.value - mean, 2) * (v.weight / sumWeight))
.sum());
}
@Override
public void dump(final OutputStream output) {
values(sorted()).forEach(v -> {
try {
output.write((v + "\n").getBytes(StandardCharsets.UTF_8));
} catch (final IOException e) {
throw new IllegalStateException(e);
}
});
}
@Override
public double getValue(final double quantile) {
if (!(quantile >= 0 || quantile <= 1)) {
throw new IllegalArgumentException("Quantile " + quantile + " is invalid");
}
if (values.length == 0) {
return 0;
}
if (values.length == 1) {
return values[0].value;
}
final int idx = (int) Math.floor((values.length - 1) * quantile);
return sorted()[idx].value;
}
private Value[] sorted() {
if (sorted == null) {
synchronized (this) {
if (sorted == null) {
sorted = new Value[values.length];
System.arraycopy(values, 0, sorted, 0, values.length);
Arrays.sort(sorted, comparing(i -> i.value));
}
}
}
return sorted;
}
private LongStream values(final Value[] values) {
return Stream.of(values).mapToLong(i -> i.value);
}
}
}