blob: 950c8d76859f9c2e4c1011ec3fa79f3674912293 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
package org.apache.tuweni.concurrent;
import static java.util.Objects.requireNonNull;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
import javax.annotation.Nullable;
* A concurrent hash map that stores values along with an expiry.
* Values are stored in the map until their expiry is reached, after which they will no longer be available and will
* appear as if removed. The actual removal is done lazily whenever the map is accessed, or when the
* {@link #purgeExpired()} method is invoked.
* @param <K> The key type.
* @param <V> The value type.
public final class ExpiringMap<K, V> implements Map<K, V> {
// Uses object equality, to ensure uniqueness as a value in the storage map
private static final class ExpiringEntry<K, V> implements Comparable<ExpiringEntry<K, V>> {
private K key;
private V value;
private long expiry;
private BiConsumer<K, V> expiryListener;
ExpiringEntry(K key, V value, long expiry, @Nullable BiConsumer<K, V> expiryListener) {
this.key = key;
this.value = value;
this.expiry = expiry;
this.expiryListener = expiryListener;
public int compareTo(ExpiringEntry<K, V> o) {
return, o.expiry);
private final ConcurrentHashMap<K, ExpiringEntry<K, V>> storage = new ConcurrentHashMap<>();
private final PriorityBlockingQueue<ExpiringEntry<K, V>> expiryQueue = new PriorityBlockingQueue<>();
private final LongSupplier currentTimeSupplier;
* Construct an empty map.
public ExpiringMap() {
ExpiringMap(LongSupplier currentTimeSupplier) {
this.currentTimeSupplier = currentTimeSupplier;
public V get(Object key) {
ExpiringEntry<K, V> entry = storage.get(key);
return (entry == null) ? null : entry.value;
public V getOrDefault(Object key, V defaultValue) {
V v;
return (((v = get(key)) != null)) ? v : defaultValue;
public boolean containsKey(Object key) {
return storage.containsKey(key);
public boolean containsValue(Object value) {
return storage.values().stream().anyMatch(e -> e.value.equals(value));
public int size() {
return storage.size();
public boolean isEmpty() {
return storage.isEmpty();
public V put(K key, V value) {
ExpiringEntry<K, V> oldEntry = storage.put(key, new ExpiringEntry<>(key, value, Long.MAX_VALUE, null));
return (oldEntry == null) ? null : oldEntry.value;
* Associates the specified value with the specified key in this map, and expires the entry when the specified expiry
* time is reached. If the map previously contained a mapping for the key, the old value is replaced by the specified
* value.
* @param key The key with which the specified value is to be associated.
* @param value The value to be associated with the specified key.
* @param expiry The expiry time for the value, in milliseconds since the epoch.
* @return The previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
public V put(K key, V value, long expiry) {
return put(key, value, expiry, null);
* Associates the specified value with the specified key in this map, and expires the entry when the specified expiry
* time is reached. If the map previously contained a mapping for the key, the old value is replaced by the specified
* value.
* @param key The key with which the specified value is to be associated.
* @param value The value to be associated with the specified key.
* @param expiry The expiry time for the value, in milliseconds since the epoch.
* @param expiryListener A listener that will be invoked when the entry expires.
* @return The previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
public V put(K key, V value, long expiry, @Nullable BiConsumer<K, V> expiryListener) {
if (expiry >= Long.MAX_VALUE) {
return put(key, value);
long now = currentTimeSupplier.getAsLong();
if (expiry <= now) {
V previous = remove(key);
if (expiryListener != null) {
expiryListener.accept(key, value);
return previous;
ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener);
ExpiringEntry<K, V> oldEntry = storage.put(key, newEntry);
if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) {
return (oldEntry == null) ? null : oldEntry.value;
public void putAll(Map<? extends K, ? extends V> m) {
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
storage.put(e.getKey(), new ExpiringEntry<>(e.getKey(), e.getValue(), Long.MAX_VALUE, null));
public V putIfAbsent(K key, V value) {
ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, new ExpiringEntry<>(key, value, Long.MAX_VALUE, null));
return (oldEntry == null) ? null : oldEntry.value;
* If the specified key is not already associated with a value, associates the specified value with the specified key
* in this map, and expires the entry when the specified expiry time is reached.
* @param key The key with which the specified value is to be associated.
* @param value The value to be associated with the specified key.
* @param expiry The expiry time for the value, in milliseconds since the epoch.
* @return The previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
public V putIfAbsent(K key, V value, long expiry) {
return putIfAbsent(key, value, expiry, null);
* If the specified key is not already associated with a value, associates the specified value with the specified key
* in this map, and expires the entry when the specified expiry time is reached.
* @param key The key with which the specified value is to be associated.
* @param value The value to be associated with the specified key.
* @param expiry The expiry time for the value, in milliseconds since the epoch.
* @param expiryListener A listener that will be invoked when the entry expires.
* @return The previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}.
public V putIfAbsent(K key, V value, long expiry, @Nullable BiConsumer<K, V> expiryListener) {
if (expiry >= Long.MAX_VALUE) {
return put(key, value);
long now = currentTimeSupplier.getAsLong();
if (expiry <= now) {
V previous = remove(key);
if (expiryListener != null) {
expiryListener.accept(key, value);
return previous;
ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener);
ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, newEntry);
if (oldEntry == null) {
return null;
return oldEntry.value;
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
ExpiringEntry<K, V> newEntry = storage.compute(key, (k, oldEntry) -> {
if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) {
V oldValue = (oldEntry == null) ? null : oldEntry.value;
V newValue = remappingFunction.apply(k, oldValue);
return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, Long.MAX_VALUE, null);
return (newEntry == null) ? null : newEntry.value;
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
ExpiringEntry<K, V> newEntry = storage.computeIfAbsent(key, k -> {
V newValue = mappingFunction.apply(k);
return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, Long.MAX_VALUE, null);
return (newEntry == null) ? null : newEntry.value;
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
ExpiringEntry<K, V> newEntry = storage.computeIfPresent(key, (k, oldEntry) -> {
if (oldEntry.expiry < Long.MAX_VALUE) {
V newValue = remappingFunction.apply(k, oldEntry.value);
return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, Long.MAX_VALUE, null);
return (newEntry == null) ? null : newEntry.value;
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
ExpiringEntry<K, V> entry =
storage.merge(key, new ExpiringEntry<>(key, value, Long.MAX_VALUE, null), (oldEntry, newEntry) -> {
if (oldEntry.expiry < Long.MAX_VALUE) {
V newValue = remappingFunction.apply(oldEntry.value, newEntry.value);
return (newValue == null) ? null : new ExpiringEntry<>(key, newValue, Long.MAX_VALUE, null);
return (entry == null) ? null : entry.value;
public V replace(K key, V value) {
ExpiringEntry<K, V> oldEntry = storage.replace(key, new ExpiringEntry<>(key, value, Long.MAX_VALUE, null));
if (oldEntry != null) {
if (oldEntry.expiry < Long.MAX_VALUE) {
return oldEntry.value;
return null;
public boolean replace(K key, V oldValue, V newValue) {
ExpiringEntry<K, V> entry = storage.computeIfPresent(key, (k, oldEntry) -> {
if (oldEntry.value.equals(oldValue)) {
if (oldEntry.expiry < Long.MAX_VALUE) {
return new ExpiringEntry<>(k, newValue, Long.MAX_VALUE, null);
return oldEntry;
return (entry != null) && entry.value.equals(newValue);
public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
storage.replaceAll((k, oldEntry) -> {
if (oldEntry.expiry < Long.MAX_VALUE) {
return new ExpiringEntry<>(k, requireNonNull(function.apply(k, oldEntry.value)), Long.MAX_VALUE, null);
public V remove(Object key) {
ExpiringEntry<K, V> entry = storage.remove(key);
if (entry == null) {
return null;
if (entry.expiry < Long.MAX_VALUE) {
return entry.value;
public boolean remove(Object key, Object value) {
ExpiringEntry<K, V> entry = storage.get(key);
if (entry == null || !value.equals(entry.value)) {
return false;
if (!storage.remove(key, entry)) {
return false;
if (entry.expiry < Long.MAX_VALUE) {
return true;
public void clear() {
public Set<K> keySet() {
return storage.keySet();
public Collection<V> values() {
return storage.values().stream().map(e -> e.value).collect(Collectors.toList());
public Set<Map.Entry<K, V>> entrySet() {
return storage.entrySet().stream().map(e -> new Map.Entry<K, V>() {
public K getKey() {
return e.getKey();
public V getValue() {
return e.getValue().value;
public V setValue(V value) {
throw new UnsupportedOperationException();
public void forEach(BiConsumer<? super K, ? super V> action) {
storage.forEach((k, v) -> action.accept(k, v.value));
* Force immediate expiration of any key/value pairs that have reached their expiry.
* @return The earliest expiry time for the current entries in the map (in milliseconds since the epoch), or
* {@code Long.MAX_VALUE} if there are no entries due to expire.
public long purgeExpired() {
return purgeExpired(currentTimeSupplier.getAsLong());
private long purgeExpired(long oldest) {
ExpiringEntry<K, V> entry;
while ((entry = expiryQueue.peek()) != null && entry.expiry <= oldest) {
if (!expiryQueue.remove(entry) || !storage.remove(entry.key, entry)) {
if (entry.expiryListener != null) {
entry.expiryListener.accept(entry.key, entry.value);
return entry == null ? Long.MAX_VALUE : entry.expiry;
public boolean equals(Object obj) {
if (obj == null) {
return false;
if (obj == this) {
return true;
if (!(obj instanceof ExpiringMap)) {
return false;
ExpiringMap other = (ExpiringMap) obj;
return storage.equals(;
public int hashCode() {
return storage.hashCode();