| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.geronimo.microprofile.reporter.storage.data; |
| |
| import static java.util.stream.Collectors.toMap; |
| |
| import java.util.ArrayList; |
| import java.util.LinkedList; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| // copy of metrics Histogram impl |
| public class InMemoryDatabase<T> { |
| private static final long REFRESH_INTERVAL = TimeUnit.HOURS.toNanos(1); |
| |
| private final String unit; |
| |
| private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
| |
| private final AtomicLong count = new AtomicLong(); |
| |
| private final AtomicLong nextRefreshTime = new AtomicLong(System.nanoTime() + REFRESH_INTERVAL); |
| |
| private final double alpha; |
| private final int bucketSize; |
| |
| private volatile long startTime = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); |
| |
| private final ConcurrentSkipListMap<Double, Value<T>> bucket = new ConcurrentSkipListMap<>(); |
| |
| public InMemoryDatabase(final double alpha, final int bucketSize, final String unit) { |
| this.unit = unit; |
| this.alpha = alpha; |
| this.bucketSize = bucketSize; |
| } |
| |
| public String getUnit() { |
| return unit; |
| } |
| |
| public LinkedList<Value<T>> snapshot() { |
| ensureUpToDate(); |
| final Lock lock = this.lock.readLock(); |
| lock.lock(); |
| try { |
| return new LinkedList<>(bucket.values()); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public void add(final T value) { |
| ensureUpToDate(); |
| |
| final long now = System.currentTimeMillis(); |
| final Lock lock = this.lock.readLock(); |
| lock.lock(); |
| try { |
| final Value<T> sample = new Value<>(value, now, Math.exp(alpha * (TimeUnit.MILLISECONDS.toSeconds(now) - startTime))); |
| final double priority = sample.weight / Math.random(); |
| |
| final long size = count.incrementAndGet(); |
| if (size <= bucketSize) { |
| bucket.put(priority, sample); |
| } else { // iterate through the bucket until we need removing low priority entries to get a new space |
| double first = bucket.firstKey(); |
| if (first < priority && bucket.putIfAbsent(priority, sample) == null) { |
| while (bucket.remove(first) == null) { |
| first = bucket.firstKey(); |
| } |
| } |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void ensureUpToDate() { |
| final long next = nextRefreshTime.get(); |
| final long now = System.nanoTime(); |
| if (now < next) { |
| return; |
| } |
| |
| final Lock lock = this.lock.writeLock(); |
| lock.lock(); |
| try { |
| if (nextRefreshTime.compareAndSet(next, now + REFRESH_INTERVAL)) { |
| final long oldStartTime = startTime; |
| startTime = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); |
| final double updateFactor = Math.exp(-alpha * (startTime - oldStartTime)); |
| if (updateFactor != 0.) { |
| bucket.putAll(new ArrayList<>(bucket.keySet()).stream().collect(toMap(k -> k * updateFactor, k -> { |
| final Value<T> previous = bucket.remove(k); |
| return new Value<>(previous.value, previous.timestamp, previous.weight * updateFactor); |
| }))); |
| count.set(bucket.size()); // N keys can lead to the same key so we must update it |
| } else { |
| bucket.clear(); |
| count.set(0); |
| } |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public static final class Value<T> { |
| |
| private final T value; |
| |
| private final long timestamp; |
| |
| private final double weight; |
| |
| private Value(final T value, final long timestamp, final double weight) { |
| this.value = value; |
| this.weight = weight; |
| this.timestamp = timestamp; |
| } |
| |
| public T getValue() { |
| return value; |
| } |
| |
| public long getTimestamp() { |
| return timestamp; |
| } |
| } |
| } |