upgrading tck which implied moving to EMA for meter and updating a bit histogram impl to be more accurate (still some refinements to do)
diff --git a/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/HistogramImpl.java b/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/HistogramImpl.java
index b0f93d7..58fcff2 100644
--- a/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/HistogramImpl.java
+++ b/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/HistogramImpl.java
@@ -16,15 +16,20 @@
*/
package org.apache.geronimo.microprofile.metrics.common;
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toMap;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@@ -33,15 +38,22 @@
import org.eclipse.microprofile.metrics.Histogram;
import org.eclipse.microprofile.metrics.Snapshot;
-// todo? rework it to use the classical exponential decay impl?
+// impl adapted from apache sirona
public class HistogramImpl implements Histogram {
- private static final long INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
- private static final Value[] EMPTY_VALUES_ARRAY = new Value[0];
+ // potential config
+ private static final double ALPHA = Double.parseDouble(System.getProperty("geronimo.metrics.storage.alpha", "0.015"));
+ private static final int BUCKET_SIZE = Integer.getInteger("geronimo.metrics.storage.size", 1024);
+ private static final long REFRESH_INTERVAL = TimeUnit.HOURS.toNanos(1);
+
+ private static final Value[] EMPTY_ARRAY = new Value[0];
private final String unit;
- private final LongAdder count = new LongAdder();
- private final Collection<Value> values = new CopyOnWriteArrayList<>();
- private final AtomicLong lastCleanUp = new AtomicLong(System.nanoTime());
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final AtomicLong count = new AtomicLong();
+ private final ConcurrentSkipListMap<Double, Value> bucket = new ConcurrentSkipListMap<>();
+ private final AtomicLong nextRefreshTime = new AtomicLong(System.nanoTime() + REFRESH_INTERVAL);
+ private volatile long startTime = nowSec();
public HistogramImpl(final String unit) {
this.unit = unit;
@@ -54,21 +66,18 @@
@Override
public synchronized void update(final long value) {
- refresh();
- count.increment();
- values.add(new Value(System.nanoTime(), value));
+ add(value);
}
@Override
public long getCount() {
- return count.sum();
+ return count.get();
}
@Override
@JsonbTransient
public Snapshot getSnapshot() {
- refresh();
- return new SnapshotImpl(values.toArray(EMPTY_VALUES_ARRAY));
+ return snapshot();
}
public String getUnit() {
@@ -115,47 +124,94 @@
return getSnapshot().getStdDev();
}
- // cheap way to avoid to explode the mem for nothing
- private void refresh() {
+ private long nowSec() {
+ return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+ }
+
+ public void add(final long value) {
+ ensureUpToDate();
+
+ final Lock lock = this.lock.readLock();
+ lock.lock();
+ try {
+ final Value sample = new Value(value, Math.exp(ALPHA * (nowSec() - startTime)));
+ final double priority = sample.weight / Math.random();
+
+ final long size = count.incrementAndGet();
+ if (size <= BUCKET_SIZE) {
+ bucket.put(priority, sample);
+ } else { // iterate through the bucket until we need removing low priority entries to get a new space
+ double first = bucket.firstKey();
+ if (first < priority && bucket.putIfAbsent(priority, sample) == null) {
+ while (bucket.remove(first) == null) {
+ first = bucket.firstKey();
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void ensureUpToDate() {
+ final long next = nextRefreshTime.get();
final long now = System.nanoTime();
- final long lastUpdateNs = lastCleanUp.get();
- final long elaspsedTime = now - lastUpdateNs;
- if (elaspsedTime > INTERVAL_NS && lastCleanUp.compareAndSet(lastUpdateNs, now)) {
- final long cleanFrom = now - INTERVAL_NS;
- values.removeIf(it -> it.timestamp > cleanFrom);
+ if (now < next) {
+ return;
+ }
+
+ final Lock lock = this.lock.writeLock();
+ lock.lock();
+ try {
+ if (nextRefreshTime.compareAndSet(next, now + REFRESH_INTERVAL)) {
+ final long oldStartTime = startTime;
+ startTime = nowSec();
+ final double updateFactor = Math.exp(-ALPHA * (startTime - oldStartTime));
+ if (updateFactor != 0.) {
+ bucket.putAll(new ArrayList<>(bucket.keySet()).stream()
+ .collect(toMap(k -> k * updateFactor, k -> {
+ final Value previous = bucket.remove(k);
+ return new Value(previous.value, previous.weight * updateFactor);
+ })));
+ count.set(bucket.size()); // N keys can lead to the same key so we must update it
+ } else {
+ bucket.clear();
+ count.set(0);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public Snapshot snapshot() {
+ ensureUpToDate();
+ final Lock lock = this.lock.readLock();
+ lock.lock();
+ try {
+ return new SnapshotImpl(bucket.values().toArray(EMPTY_ARRAY));
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private static final class Value {
+ private final long value;
+ private final double weight;
+
+ private Value(final long value, final double weight) {
+ this.value = value;
+ this.weight= weight;
}
}
private static class SnapshotImpl extends Snapshot {
private final Value[] values;
- private volatile long[] sorted;
+ private Value[] sorted;
private SnapshotImpl(final Value[] values) {
this.values = values;
- }
-
- @Override
- public double getValue(final double quantile) {
- if (values.length == 0) {
- return 0;
- }
- if (values.length == 1) {
- return values[0].value;
- }
- if (sorted == null) {
- synchronized (this) {
- if (sorted == null) {
- sorted = getValues();
- Arrays.sort(sorted);
- }
- }
- }
- return sorted[(int) Math.floor((sorted.length - 1) * quantile)];
- }
-
- @Override
- public long[] getValues() {
- return longs().toArray();
+ // no high computation here, we are under lock + all methods are not called in general
}
@Override
@@ -164,18 +220,38 @@
}
@Override
- public long getMax() {
- return longs().max().orElse(0);
+ public long[] getValues() {
+ return values(sorted()).toArray();
}
@Override
- public double getMean() {
- return longs().sum() / (double) values.length;
+ public long getMax() {
+ if (values.length == 0) {
+ return 0;
+ }
+ if (sorted != null) {
+ return sorted[sorted.length - 1].value;
+ }
+ return values(values).max().orElse(0);
}
@Override
public long getMin() {
- return longs().min().orElse(0);
+ if (values.length == 0) {
+ return 0;
+ }
+ if (sorted != null) {
+ return sorted[0].value;
+ }
+ return values(values).min().orElse(0);
+ }
+
+ @Override
+ public double getMean() {
+ if (values.length == 0) {
+ return 0;
+ }
+ return values(values).sum() * 1. / values.length;
}
@Override
@@ -184,12 +260,15 @@
return 0;
}
final double mean = getMean();
- return Math.sqrt(longs().map(v -> (long) Math.pow(v - mean, 2)).sum() / values.length - 1);
+ final double sumWeight = Stream.of(values).mapToDouble(i -> i.weight).sum();
+ return Math.sqrt(Stream.of(values)
+ .mapToDouble(v -> Math.pow(v.value - mean, 2) * (v.weight / sumWeight))
+ .sum());
}
@Override
public void dump(final OutputStream output) {
- longs().forEach(v -> {
+ values(sorted()).forEach(v -> {
try {
output.write((v + "\n").getBytes(StandardCharsets.UTF_8));
} catch (final IOException e) {
@@ -198,18 +277,36 @@
});
}
- private LongStream longs() {
- return Stream.of(values).mapToLong(i -> i.value);
+ @Override
+ public double getValue(final double quantile) {
+ if (!(quantile >= 0 || quantile <= 1)) {
+ throw new IllegalArgumentException("Quantile " + quantile + " is invalid");
+ }
+ if (values.length == 0) {
+ return 0;
+ }
+ if (values.length == 1) {
+ return values[0].value;
+ }
+ final int idx = (int) Math.floor((values.length - 1) * quantile);
+ return sorted()[idx].value;
}
- }
- private static final class Value {
- private final long timestamp;
- private final long value;
+ private Value[] sorted() {
+ if (sorted == null) {
+ synchronized (this) {
+ if (sorted == null) {
+ sorted = new Value[values.length];
+ System.arraycopy(values, 0, sorted, 0, values.length);
+ Arrays.sort(sorted, comparing(i -> i.value));
+ }
+ }
+ }
+ return sorted;
+ }
- private Value(final long timestamp, final long value) {
- this.timestamp = timestamp;
- this.value = value;
+ private LongStream values(final Value[] values) {
+ return Stream.of(values).mapToLong(i -> i.value);
}
}
}
diff --git a/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/MeterImpl.java b/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/MeterImpl.java
index 82faf25..6895dbb 100644
--- a/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/MeterImpl.java
+++ b/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/MeterImpl.java
@@ -16,33 +16,27 @@
*/
package org.apache.geronimo.microprofile.metrics.common;
+import static org.apache.geronimo.microprofile.metrics.common.expdecay.ExponentialMovingAverage.forMinutes;
+
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Stream;
import javax.json.bind.annotation.JsonbProperty;
+import org.apache.geronimo.microprofile.metrics.common.expdecay.ExponentialMovingAverage;
import org.eclipse.microprofile.metrics.Meter;
public class MeterImpl implements Meter {
- private static final long INTERVAL_NS;
- private static final double ALPHA_MN;
- private static final double ALPHA_5MN;
- private static final double ALPHA_15MN;
- static {
- final double interval = 5; // this is not needed at runtime so no need of a constant
- INTERVAL_NS = TimeUnit.SECONDS.toNanos((int) interval * 60);
- ALPHA_MN = 1 - Math.exp(-interval / 60.);
- ALPHA_5MN = 1 - Math.exp(-interval / 12.);
- ALPHA_15MN = 1 - Math.exp(-interval / 4.);
- }
+ private static final long REFRESH_INTERVAL = TimeUnit.SECONDS.toNanos(5);
+ private final AtomicLong lastRefresh = new AtomicLong(System.nanoTime());
private final LongAdder count = new LongAdder();
- private final Rate rate15 = new Rate(ALPHA_15MN, INTERVAL_NS);
- private final Rate rate5 = new Rate(ALPHA_5MN, INTERVAL_NS);
- private final Rate rate1 = new Rate(ALPHA_MN, INTERVAL_NS);
+ private final ExponentialMovingAverage rate15 = forMinutes(15);
+ private final ExponentialMovingAverage rate5 = forMinutes(5);
+ private final ExponentialMovingAverage rate1 = forMinutes(1);
private final long initNs = System.nanoTime();
- private final AtomicLong lastUpdate = new AtomicLong(System.nanoTime());
private final String unit;
public MeterImpl(final String unit) {
@@ -58,13 +52,13 @@
mark(1);
}
- @Override // this is not the most beautiful piece but locking here would be a perf killer
+ @Override
public void mark(final long n) {
- doRefresh();
+ updateIfNeeded();
count.add(n);
- rate1.update(n);
- rate5.update(n);
- rate15.update(n);
+ rate1.add(n);
+ rate5.add(n);
+ rate15.add(n);
}
@Override
@@ -75,22 +69,22 @@
@Override
@JsonbProperty("fifteenMinRate")
public double getFifteenMinuteRate() {
- doRefresh();
- return rate15.value;
+ updateIfNeeded();
+ return rate15.rate();
}
@Override
@JsonbProperty("fiveMinRate")
public double getFiveMinuteRate() {
- doRefresh();
- return rate5.value;
+ updateIfNeeded();
+ return rate5.rate();
}
@Override
@JsonbProperty("oneMinRate")
public double getOneMinuteRate() {
- doRefresh();
- return rate1.value;
+ updateIfNeeded();
+ return rate1.rate();
}
@Override
@@ -107,53 +101,15 @@
if (seconds == 0) {
return 0;
}
- return count / seconds;
+ return count * 1. / seconds;
}
- private void doRefresh() {
+ private void updateIfNeeded() {
final long now = System.nanoTime();
- final long lastUpdateNs = lastUpdate.get();
- final long elaspsedTime = now - lastUpdateNs;
- if (elaspsedTime > INTERVAL_NS && lastUpdate.compareAndSet(lastUpdateNs, now)) {
- final long diff = elaspsedTime / INTERVAL_NS;
- for (long it = 0; it < diff; it++) { // simulate time, avoids a background thread
- rate1.refresh();
- rate5.refresh();
- rate15.refresh();
- }
- }
- }
-
- private static class Rate {
- private volatile double value = 0;
-
- private final double alpha;
- private final double interval;
- private final LongAdder updates = new LongAdder();
- private volatile boolean initialized = false;
-
- private Rate(final double alpha, final long interval) {
- this.interval = interval;
- this.alpha = alpha;
- }
-
- private void update(final long n) {
- updates.add(n);
- }
-
- private void refresh() {
- final long count = updates.sumThenReset();
- final double val = count / interval;
- if (!initialized) {
- synchronized (this) {
- value = val;
- }
- initialized = true;
- return;
- }
- synchronized (this) {
- value += (val - value) * alpha;
- }
+ final long previousRefresh = lastRefresh.get();
+ if (now - previousRefresh >= REFRESH_INTERVAL && lastRefresh.compareAndSet(previousRefresh, now)) {
+ lastRefresh.set(now);
+ Stream.of(rate1, rate5, rate15).forEach(ExponentialMovingAverage::refresh);
}
}
}
diff --git a/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/expdecay/ExponentialMovingAverage.java b/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/expdecay/ExponentialMovingAverage.java
new file mode 100644
index 0000000..b9bfe37
--- /dev/null
+++ b/geronimo-metrics-common/src/main/java/org/apache/geronimo/microprofile/metrics/common/expdecay/ExponentialMovingAverage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.microprofile.metrics.common.expdecay;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+public class ExponentialMovingAverage {
+ private static final double INTERVAL = SECONDS.toNanos(5);
+ private static final double RATE_RATIO = TimeUnit.SECONDS.toNanos(1);
+
+ private final LongAdder accumulator = new LongAdder();
+ private final double alpha;
+
+ private volatile double rate = 0.0;
+
+ private ExponentialMovingAverage(final double alpha) {
+ this.alpha = alpha;
+ }
+
+ public double rate() {
+ return rate * RATE_RATIO;
+ }
+
+ public void add(final long n) {
+ accumulator.add(n);
+ }
+
+ public void refresh() {
+ final long count = accumulator.sumThenReset();
+ final double instantRate = count / INTERVAL;
+ final double newRate = rate == 0. ? instantRate : nextRate(instantRate);
+ this.rate = newRate;
+ }
+
+ private double nextRate(final double instantRate) {
+ return rate + (alpha * (instantRate - rate));
+ }
+
+ public static ExponentialMovingAverage forMinutes(final int minutes) {
+ return new ExponentialMovingAverage(Math.exp(-5/*INTERVAL in sec*/ / 60. / minutes));
+ }
+}
diff --git a/pom.xml b/pom.xml
index d735d9b..832fc69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,7 +42,7 @@
</scm>
<properties>
- <spec.version>1.1</spec.version>
+ <spec.version>1.1.1</spec.version>
<arquillian.version>1.1.8.Final</arquillian.version>
<meecrowave.version>1.2.3</meecrowave.version>
</properties>