blob: 950c8d76859f9c2e4c1011ec3fa79f3674912293 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.concurrent;
import static java.util.Objects.requireNonNull;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
/**
* A concurrent hash map that stores values along with an expiry.
*
* Values are stored in the map until their expiry is reached, after which they will no longer be available and will
* appear as if removed. The actual removal is done lazily whenever the map is accessed, or when the
* {@link #purgeExpired()} method is invoked.
*
* @param <K> The key type.
* @param <V> The value type.
*/
public final class ExpiringMap<K, V> implements Map<K, V> {
// Uses object equality, to ensure uniqueness as a value in the storage map
private static final class ExpiringEntry<K, V> implements Comparable<ExpiringEntry<K, V>> {
private K key;
private V value;
private long expiry;
@Nullable
private BiConsumer<K, V> expiryListener;
ExpiringEntry(K key, V value, long expiry, @Nullable BiConsumer<K, V> expiryListener) {
this.key = key;
this.value = value;
this.expiry = expiry;
this.expiryListener = expiryListener;
}
@Override
public int compareTo(ExpiringEntry<K, V> o) {
return Long.compare(expiry, o.expiry);
}
}
private final ConcurrentHashMap<K, ExpiringEntry<K, V>> storage = new ConcurrentHashMap<>();
private final PriorityBlockingQueue<ExpiringEntry<K, V>> expiryQueue = new PriorityBlockingQueue<>();
private final LongSupplier currentTimeSupplier;
/**
* Construct an empty map.
*/
public ExpiringMap() {
this(System::currentTimeMillis);
}
@VisibleForTesting
ExpiringMap(LongSupplier currentTimeSupplier) {
this.currentTimeSupplier = currentTimeSupplier;
}
@Nullable
@Override
public V get(Object key) {
requireNonNull(key);
purgeExpired();
ExpiringEntry<K, V> entry = storage.get(key);
return (entry == null) ? null : entry.value;
}
@Override
public V getOrDefault(Object key, V defaultValue) {
V v;
return (((v = get(key)) != null)) ? v : defaultValue;
}
@Override
public boolean containsKey(Object key) {
requireNonNull(key);
purgeExpired();
return storage.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
requireNonNull(value);
purgeExpired();
return storage.values().stream().anyMatch(e -> e.value.equals(value));
}
@Override
public int size() {
purgeExpired();
return storage.size();
}
@Override
public boolean isEmpty() {
purgeExpired();
return storage.isEmpty();
}
@Nullable
@Override
public V put(K key, V value) {
requireNonNull(key);
requireNonNull(value);
purgeExpired();
ExpiringEntry<K, V> oldEntry = storage.put(key, new ExpiringEntry<>(key, value, Long.MAX_VALUE, null));
return (oldEntry == null) ? null : oldEntry.value;
}
/**
* Associates the specified value with the specified key in this map, and expires the entry when the specified expiry
* time is reached. If the map previously contained a mapping for the key, the old value is replaced by the specified
* value.
*
* @param key The key with which the specified value is to be associated.
* @param value The value to be associated with the specified key.
* @param expiry The expiry time for the value, in milliseconds since the epoch.
* @return The previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
*/
@Nullable
public V put(K key, V value, long expiry) {
return put(key, value, expiry, null);
}
/**
* Associates the specified value with the specified key in this map, and expires the entry when the specified expiry
* time is reached. If the map previously contained a mapping for the key, the old value is replaced by the specified
* value.
*
* @param key The key with which the specified value is to be associated.
* @param value The value to be associated with the specified key.
* @param expiry The expiry time for the value, in milliseconds since the epoch.
* @param expiryListener A listener that will be invoked when the entry expires.
* @return The previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
*/
@Nullable
public V put(K key, V value, long expiry, @Nullable BiConsumer<K, V> expiryListener) {
requireNonNull(key);
requireNonNull(value);
if (expiry >= Long.MAX_VALUE) {
return put(key, value);
}
long now = currentTimeSupplier.getAsLong();
purgeExpired(now);
if (expiry <= now) {
V previous = remove(key);
if (expiryListener != null) {
expiryListener.accept(key, value);
}
return previous;
}
ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener);
ExpiringEntry<K, V> oldEntry = storage.put(key, newEntry);
expiryQueue.offer(newEntry);
if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
return (oldEntry == null) ? null : oldEntry.value;
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
requireNonNull(m);
purgeExpired();
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
storage.put(e.getKey(), new ExpiringEntry<>(e.getKey(), e.getValue(), Long.MAX_VALUE, null));
}
}
@Nullable
@Override
public V putIfAbsent(K key, V value) {
requireNonNull(key);
requireNonNull(value);
purgeExpired();
ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, new ExpiringEntry<>(key, value, Long.MAX_VALUE, null));
return (oldEntry == null) ? null : oldEntry.value;
}
/**
* If the specified key is not already associated with a value, associates the specified value with the specified key
* in this map, and expires the entry when the specified expiry time is reached.
*
* @param key The key with which the specified value is to be associated.
* @param value The value to be associated with the specified key.
* @param expiry The expiry time for the value, in milliseconds since the epoch.
* @return The previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
*/
@Nullable
public V putIfAbsent(K key, V value, long expiry) {
return putIfAbsent(key, value, expiry, null);
}
/**
* If the specified key is not already associated with a value, associates the specified value with the specified key
* in this map, and expires the entry when the specified expiry time is reached.
*
* @param key The key with which the specified value is to be associated.
* @param value The value to be associated with the specified key.
* @param expiry The expiry time for the value, in milliseconds since the epoch.
* @param expiryListener A listener that will be invoked when the entry expires.
* @return The previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
*/
@Nullable
public V putIfAbsent(K key, V value, long expiry, @Nullable BiConsumer<K, V> expiryListener) {
requireNonNull(key);
requireNonNull(value);
if (expiry >= Long.MAX_VALUE) {
return put(key, value);
}
long now = currentTimeSupplier.getAsLong();
purgeExpired(now);
if (expiry <= now) {
V previous = remove(key);
if (expiryListener != null) {
expiryListener.accept(key, value);
}
return previous;
}
ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener);
ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, newEntry);
if (oldEntry == null) {
expiryQueue.offer(newEntry);
return null;
}
return oldEntry.value;
}
@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
ExpiringEntry<K, V> newEntry = storage.compute(key, (k, oldEntry) -> {
if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
V oldValue = (oldEntry == null) ? null : oldEntry.value;
V newValue = remappingFunction.apply(k, oldValue);
return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, Long.MAX_VALUE, null);
});
return (newEntry == null) ? null : newEntry.value;
}
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
ExpiringEntry<K, V> newEntry = storage.computeIfAbsent(key, k -> {
V newValue = mappingFunction.apply(k);
return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, Long.MAX_VALUE, null);
});
return (newEntry == null) ? null : newEntry.value;
}
@Override
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
ExpiringEntry<K, V> newEntry = storage.computeIfPresent(key, (k, oldEntry) -> {
if (oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
V newValue = remappingFunction.apply(k, oldEntry.value);
return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, Long.MAX_VALUE, null);
});
return (newEntry == null) ? null : newEntry.value;
}
@Override
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
ExpiringEntry<K, V> entry =
storage.merge(key, new ExpiringEntry<>(key, value, Long.MAX_VALUE, null), (oldEntry, newEntry) -> {
if (oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
V newValue = remappingFunction.apply(oldEntry.value, newEntry.value);
return (newValue == null) ? null : new ExpiringEntry<>(key, newValue, Long.MAX_VALUE, null);
});
return (entry == null) ? null : entry.value;
}
@Override
public V replace(K key, V value) {
ExpiringEntry<K, V> oldEntry = storage.replace(key, new ExpiringEntry<>(key, value, Long.MAX_VALUE, null));
if (oldEntry != null) {
if (oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
return oldEntry.value;
}
return null;
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
requireNonNull(oldValue);
requireNonNull(newValue);
ExpiringEntry<K, V> entry = storage.computeIfPresent(key, (k, oldEntry) -> {
if (oldEntry.value.equals(oldValue)) {
if (oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
return new ExpiringEntry<>(k, newValue, Long.MAX_VALUE, null);
}
return oldEntry;
});
return (entry != null) && entry.value.equals(newValue);
}
@Override
public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
storage.replaceAll((k, oldEntry) -> {
if (oldEntry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(oldEntry);
}
return new ExpiringEntry<>(k, requireNonNull(function.apply(k, oldEntry.value)), Long.MAX_VALUE, null);
});
}
@Override
public V remove(Object key) {
requireNonNull(key);
purgeExpired();
ExpiringEntry<K, V> entry = storage.remove(key);
if (entry == null) {
return null;
}
if (entry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(entry);
}
return entry.value;
}
@Override
public boolean remove(Object key, Object value) {
requireNonNull(key);
requireNonNull(value);
purgeExpired();
ExpiringEntry<K, V> entry = storage.get(key);
if (entry == null || !value.equals(entry.value)) {
return false;
}
if (!storage.remove(key, entry)) {
return false;
}
if (entry.expiry < Long.MAX_VALUE) {
expiryQueue.remove(entry);
}
return true;
}
@Override
public void clear() {
expiryQueue.clear();
storage.clear();
}
@Override
public Set<K> keySet() {
purgeExpired();
return storage.keySet();
}
@Override
public Collection<V> values() {
purgeExpired();
return storage.values().stream().map(e -> e.value).collect(Collectors.toList());
}
@Override
public Set<Map.Entry<K, V>> entrySet() {
purgeExpired();
return storage.entrySet().stream().map(e -> new Map.Entry<K, V>() {
@Override
public K getKey() {
return e.getKey();
}
@Override
public V getValue() {
return e.getValue().value;
}
@Override
public V setValue(V value) {
throw new UnsupportedOperationException();
}
}).collect(Collectors.toSet());
}
@Override
public void forEach(BiConsumer<? super K, ? super V> action) {
storage.forEach((k, v) -> action.accept(k, v.value));
}
/**
* Force immediate expiration of any key/value pairs that have reached their expiry.
*
* @return The earliest expiry time for the current entries in the map (in milliseconds since the epoch), or
* {@code Long.MAX_VALUE} if there are no entries due to expire.
*/
public long purgeExpired() {
return purgeExpired(currentTimeSupplier.getAsLong());
}
private long purgeExpired(long oldest) {
ExpiringEntry<K, V> entry;
while ((entry = expiryQueue.peek()) != null && entry.expiry <= oldest) {
if (!expiryQueue.remove(entry) || !storage.remove(entry.key, entry)) {
continue;
}
if (entry.expiryListener != null) {
entry.expiryListener.accept(entry.key, entry.value);
}
}
return entry == null ? Long.MAX_VALUE : entry.expiry;
}
@SuppressWarnings("rawtypes")
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (!(obj instanceof ExpiringMap)) {
return false;
}
ExpiringMap other = (ExpiringMap) obj;
return storage.equals(other.storage);
}
@Override
public int hashCode() {
return storage.hashCode();
}
}