blob: 399eb0e3b5695df541097105806f745721a9fc75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.concurrent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
* An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
* <p>
* The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
* {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
* with synchronization scope reduced to the single key - that is, when dealing with a single key, unlike
* {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
* to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
* scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
* does not guarantee at-most-once semantics of running the mapping function for a single key.
*
* @param <K>
* @param <V>
*/
public class LoadingMap<K, V>
{
private final NonBlockingHashMap<K, Future<V>> internalMap = new NonBlockingHashMap<>();
/**
* Returns a promise for the given key or null if there is nothing associated with the key.
* <p/>if the promise is not fulfilled, it means that there a loading process associated with the key
* <p/>if the promise is fulfilled with {@code null} value, it means that there is an unloading process associated with the key
* <p/>if the promise is fulfilled with a failure, it means that a loading process associated with the key failed
* but the exception was not propagated yet (a failed promise is eventually removed from the map)
*/
@VisibleForTesting
Future<V> get(K key)
{
return internalMap.get(key);
}
/**
* Get a value for a given key. Returns a non-null object only if there is a successfully initialized value associated,
* with the provided key. It returns {@code null} if there is no value for the key, or the value is being initialized
* or removed. It does not throw if the last attempt to initialize the value failed.
*/
public V getIfReady(K key)
{
Future<V> future = internalMap.get(key);
return future != null ? future.getNow() : null;
}
/**
* If the value for the given key is missing, execute a load function to obtain a value and put it into the map.
* It is guaranteed that the loading and unloading a value for a single key are executed serially. It is also
* guaranteed that the load function is executed exactly once to load a value into the map (regardless of the concurrent attempts).
* <p/>
* In case there is a concurrent attempt to load a value for this key, this attempt waits until the concurrent attempt
* is done and returns its result (if succeeded). If the concurrent attempt fails, this attempt is retried. However,
* if this attempt fails, it is not retried and the exception is rethrown. In case there is a concurrent attempt
* to unload a value for this key, this attempt waits until the concurrent attempt is done and retries loading.
* <p/>
* When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function
* throws exception, it is rethrown by this method. In both cases nothing gets added to the map.
* <p/>
* It is allowed to nest loading for a different key, though nested loading for the same key results in a deadlock.
*/
public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException
{
while (true)
{
Future<V> future = internalMap.get(key);
boolean attemptedInThisThread = false;
if (future == null)
{
AsyncPromise<V> newEntry = new AsyncPromise<>();
future = internalMap.putIfAbsent(key, newEntry);
if (future == null)
{
// We managed to create an entry for the value. Now initialize it.
attemptedInThisThread = true;
future = newEntry;
try
{
V v = loadFunction.get();
if (v == null)
throw new NullPointerException("The mapping function returned null");
else
newEntry.setSuccess(v);
}
catch (Throwable t)
{
newEntry.setFailure(t);
// Remove future so that construction can be retried later
internalMap.remove(key, future);
}
}
// Else some other thread beat us to it, but we now have the reference to the future which we can wait for.
}
V v = future.awaitUninterruptibly().getNow();
if (v != null) // implies success
return v;
if (attemptedInThisThread)
// Rethrow if the failing attempt was initiated by us (failed and attemptedInThisThread)
future.rethrowIfFailed();
// Retry in other cases, that is, if blockingUnloadIfPresent was called in the meantime
// (success and getNow == null) hoping that unloading gets finished soon, and if the concurrent attempt
// to load entry fails
Thread.yield();
}
}
/**
* If a value for the given key is present, unload function is run and the value is removed from the map.
* Similarly to {@link #blockingLoadIfAbsent(Object, Supplier)} at-most-once semantics is guaranteed for unload
* function.
* <p/>
* When unload function fails, the value is removed from the map anyway and the failure is rethrown.
* <p/>
* When the key was not found, the method returns {@code null}.
*
* @throws UnloadExecutionException when the unloading failed to complete - this is checked exception because
* the value is removed from the map regardless of the result of unloading;
* therefore if the unloading failed, the caller is responsible for handling that
*/
public V blockingUnloadIfPresent(K key, Consumer<? super V> unloadFunction) throws UnloadExecutionException
{
Promise<V> droppedFuture = new AsyncPromise<V>().setSuccess(null);
Future<V> existingFuture;
do
{
existingFuture = internalMap.get(key);
if (existingFuture == null || existingFuture.isDone() && existingFuture.getNow() == null)
return null;
} while (!internalMap.replace(key, existingFuture, droppedFuture));
V v = existingFuture.awaitUninterruptibly().getNow();
try
{
if (v == null)
// which means that either the value failed to load or a concurrent attempt to unload already did the work
return null;
unloadFunction.accept(v);
return v;
}
catch (Throwable t)
{
throw new UnloadExecutionException(v, t);
}
finally
{
Future<V> future = internalMap.remove(key);
assert future == droppedFuture;
}
}
/**
* Thrown when unloading a value failed. It encapsulates the value which was failed to unload.
*/
public static class UnloadExecutionException extends ExecutionException
{
private final Object value;
public UnloadExecutionException(Object value, Throwable cause)
{
super(cause);
this.value = value;
}
public <T> T value()
{
return (T) value;
}
}
}