| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.causality; |
| |
| import static java.util.concurrent.CompletableFuture.completedFuture; |
| import static java.util.concurrent.CompletableFuture.failedFuture; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ConcurrentNavigableMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.BiFunction; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| import org.apache.ignite.lang.IgniteInternalException; |
| import org.apache.ignite.lang.IgniteStringFormatter; |
| import org.apache.ignite.lang.IgniteTriConsumer; |
| |
| /** |
| * Parametrized type to store several versions of the value. |
| * A value can be available through the causality token, which is represented by long. |
| * |
| * @param <T> Type of real value. |
| */ |
| public class VersionedValue<T> { |
| /** Token until the value is initialized. */ |
| private static final long NOT_INITIALIZED = -1L; |
| |
| /** Default history size. */ |
| private static final int DEFAULT_HISTORY_SIZE = 2; |
| |
| /** Size of stored history. */ |
| private final int historySize; |
| |
| /** List of completion listeners, see {@link #whenComplete(IgniteTriConsumer)}. */ |
| private final List<IgniteTriConsumer<Long, T, Throwable>> completionListeners = new CopyOnWriteArrayList<>(); |
| |
| /** Versioned value storage. */ |
| private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history = new ConcurrentSkipListMap<>(); |
| |
| /** |
| * This lock guarantees that the history is not trimming {@link #trimToSize(long)} during getting a value from versioned storage {@link |
| * #get(long)}. |
| */ |
| private final ReadWriteLock trimHistoryLock = new ReentrantReadWriteLock(); |
| |
| /** Initial future. The future will be completed when {@link VersionedValue} sets a first value. */ |
| private final CompletableFuture<T> initFut = new CompletableFuture<>(); |
| |
| /** The supplier may provide a value which will used as a default. */ |
| private final Supplier<T> defaultValSupplier; |
| |
| /** Update mutex. */ |
| private final Object updateMutex = new Object(); |
| |
| /** Value that can be used as default. */ |
| private final AtomicReference<T> defaultValRef; |
| |
| /** Last applied causality token. */ |
| private volatile long actualToken = NOT_INITIALIZED; |
| |
| /** |
| * Future that will be completed after all updates over the value in context of current causality token will be performed. |
| * This {@code updaterFuture} is {@code null} if no updates in context of current causality token have been initiated. |
| * See {@link #update(long, BiFunction)}. |
| */ |
| private volatile CompletableFuture<T> updaterFuture = null; |
| |
| /** |
| * Constructor. |
| * |
| * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this |
| * VersionedValue should be able to listen to, for receiving storage revision updates. |
| * This closure is called once on a construction of this VersionedValue and accepts a |
| * {@code Function<Long, CompletableFuture<?>>} that should be called on every update of |
| * storage revision as a listener. IMPORTANT: Revision update shouldn't happen |
| * concurrently with {@link #complete(long, T)} operations. |
| * @param historySize Size of the history of changes to store, including last applied token. |
| * @param defaultVal Supplier of the default value, that is used on {@link #update(long, BiFunction)} to |
| * evaluate the default value if the value is not initialized yet. It is not guaranteed to |
| * execute only once. |
| */ |
| public VersionedValue( |
| Consumer<Function<Long, CompletableFuture<?>>> observableRevisionUpdater, |
| int historySize, |
| Supplier<T> defaultVal |
| ) { |
| this.historySize = historySize; |
| |
| this.defaultValSupplier = defaultVal; |
| |
| this.defaultValRef = defaultValSupplier == null ? null : new AtomicReference<>(); |
| |
| if (observableRevisionUpdater != null) { |
| observableRevisionUpdater.accept(this::completeOnRevision); |
| } |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this |
| * VersionedValue should be able to listen to, for receiving storage revision updates. |
| * This closure is called once on a construction of this VersionedValue and accepts a |
| * {@code Function<Long, CompletableFuture<?>>} that should be called on every update of |
| * storage revision as a listener. IMPORTANT: Revision update shouldn't happen |
| * concurrently with {@link #complete(long, T)} operations. |
| * @param defaultVal Supplier of the default value, that is used on {@link #update(long, BiFunction)} to |
| * evaluate the default value if the value is not initialized yet. It is not guaranteed to |
| * execute only once. |
| */ |
| public VersionedValue( |
| Consumer<Function<Long, CompletableFuture<?>>> observableRevisionUpdater, |
| Supplier<T> defaultVal |
| ) { |
| this(observableRevisionUpdater, DEFAULT_HISTORY_SIZE, defaultVal); |
| } |
| |
| /** |
| * Constructor with default history size that equals 2. See {@link #VersionedValue(Consumer, int, Supplier)}. |
| * |
| * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this |
| * VersionedValue should be able to listen to, for receiving storage revision updates. |
| * This closure is called once on a construction of this VersionedValue and accepts a |
| * {@code Function<Long, CompletableFuture<?>>} that should be called on every update of |
| * storage revision as a listener. IMPORTANT: Revision update shouldn't happen |
| * concurrently with {@link #complete(long, T)} operations. |
| */ |
| public VersionedValue(Consumer<Function<Long, CompletableFuture<?>>> observableRevisionUpdater) { |
| this(observableRevisionUpdater, DEFAULT_HISTORY_SIZE, null); |
| } |
| |
| /** |
| * Creates a future for this value and causality token, or returns it if it already exists. |
| * |
| * <p>The returned future is associated with an update having the given causality token and completes when this update is finished |
| * applying. |
| * |
| * @param causalityToken Causality token. Let's assume that the update associated with token N is already applied to this value. Then, |
| * if token N is given as an argument, a completed future will be returned. If token N - 1 is given, this method |
| * returns the result in the state that is actual for the given token. If the token is strongly outdated, {@link |
| * OutdatedTokenException} is thrown. If token N + 1 is given, this method will return a future that will be |
| * completed when the update associated with token N + 1 will have been applied. Tokens that greater than N by |
| * more than 1 should never be passed. |
| * @return The future. |
| * @throws OutdatedTokenException If outdated token is passed as an argument. |
| */ |
| public CompletableFuture<T> get(long causalityToken) { |
| if (initFut.isDone()) { |
| return getInternal(causalityToken); |
| } |
| |
| return initFut.thenCompose(o -> getInternal(causalityToken)); |
| } |
| |
| /** |
| * Gets a future corresponding the token when the {@link VersionedValue} is already initiated. |
| * |
| * @param causalityToken Causality token. |
| * @return The future. |
| */ |
| private CompletableFuture<T> getInternal(long causalityToken) { |
| long actualToken0 = this.actualToken; |
| |
| if (history.floorEntry(causalityToken) == null) { |
| throw new OutdatedTokenException(causalityToken, actualToken0, historySize); |
| } |
| |
| if (causalityToken <= actualToken0) { |
| return getValueForPreviousToken(causalityToken); |
| } |
| |
| trimHistoryLock.readLock().lock(); |
| |
| try { |
| if (causalityToken <= actualToken0) { |
| return getValueForPreviousToken(causalityToken); |
| } |
| |
| var fut = new CompletableFuture<T>(); |
| |
| CompletableFuture<T> previousFut = history.putIfAbsent(causalityToken, fut); |
| |
| return previousFut == null ? fut : previousFut; |
| } finally { |
| trimHistoryLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Gets the latest value of completed future. |
| */ |
| public T latest() { |
| for (CompletableFuture<T> fut : history.descendingMap().values()) { |
| if (fut.isDone()) { |
| return fut.join(); |
| } |
| } |
| |
| return getDefault(); |
| } |
| |
| /** |
| * Creates (if needed) and returns a default value. |
| * |
| * @return The value. |
| */ |
| private T getDefault() { |
| if (defaultValSupplier != null && defaultValRef.get() == null) { |
| T defaultVal = defaultValSupplier.get(); |
| |
| assert defaultVal != null : "Default value can't be null."; |
| |
| defaultValRef.compareAndSet(null, defaultVal); |
| } |
| |
| return defaultValRef == null ? null : defaultValRef.get(); |
| } |
| |
| /** |
| * Gets a value for less or equal token than the actual {@link #actualToken}. |
| * |
| * @param causalityToken Causality token. |
| * @return A completed future that contains a value. |
| * @throws OutdatedTokenException If outdated token is passed as an argument. |
| */ |
| private CompletableFuture<T> getValueForPreviousToken(long causalityToken) { |
| Entry<Long, CompletableFuture<T>> histEntry = history.floorEntry(causalityToken); |
| |
| if (histEntry == null) { |
| throw new OutdatedTokenException(causalityToken, actualToken, historySize); |
| } |
| |
| return histEntry.getValue(); |
| } |
| |
| /** |
| * Save the version of the value associated with the given causality token. If someone has got a future to await the value associated |
| * with the given causality token (see {@link #get(long)}, then the future will be completed. |
| * |
| * @param causalityToken Causality token. |
| */ |
| public void complete(long causalityToken) { |
| completeOnRevision(causalityToken); |
| } |
| |
| /** |
| * Save the version of the value associated with the given causality token. If someone has got a future to await the value associated |
| * with the given causality token (see {@link #get(long)}, then the future will be completed. |
| * |
| * @param causalityToken Causality token. |
| * @param value Current value. |
| */ |
| public void complete(long causalityToken, T value) { |
| long actualToken0 = actualToken; |
| |
| if (actualToken0 == NOT_INITIALIZED) { |
| history.put(causalityToken, initFut); |
| } |
| |
| checkToken(actualToken0, causalityToken); |
| |
| completeInternal(causalityToken, value, null); |
| |
| completeOnRevision(causalityToken); |
| } |
| |
| /** |
| * Save the exception associated with the given causality token. If someone has got a future to await the value associated |
| * with the given causality token (see {@link #get(long)}, then the future will be completed. |
| * |
| * @param causalityToken Causality token. |
| * @param throwable An exception. |
| */ |
| public void completeExceptionally(long causalityToken, Throwable throwable) { |
| long actualToken0 = actualToken; |
| |
| if (actualToken0 == NOT_INITIALIZED) { |
| history.put(causalityToken, initFut); |
| } |
| |
| checkToken(actualToken0, causalityToken); |
| |
| completeInternal(causalityToken, null, throwable); |
| |
| completeOnRevision(causalityToken); |
| } |
| |
| /** |
| * This internal method assigns either value or exception according to specific token. |
| * |
| * @param causalityToken Causality token. |
| * @param value Value to set. |
| * @param throwable An exception. |
| */ |
| private void completeInternal(long causalityToken, T value, Throwable throwable) { |
| CompletableFuture<T> res = history.putIfAbsent( |
| causalityToken, |
| throwable == null ? completedFuture(value) : failedFuture(throwable) |
| ); |
| |
| if (res == null) { |
| notifyCompletionListeners(causalityToken, value, throwable); |
| |
| return; |
| } |
| |
| assert !res.isDone() : completeInternalConflictErrorMessage(res, causalityToken, value, throwable); |
| |
| if (throwable == null) { |
| res.complete(value); |
| } else { |
| res.completeExceptionally(throwable); |
| } |
| |
| notifyCompletionListeners(causalityToken, value, throwable); |
| } |
| |
| /** |
| * Builds an error message for the case when there is a conflict between history and a value or exception that is going to be |
| * saved. |
| * |
| * @param future Future. |
| * @param token Token. |
| * @param value Value. |
| * @param throwable Throwable. |
| * @return Error message. |
| */ |
| private String completeInternalConflictErrorMessage(CompletableFuture<T> future, long token, T value, Throwable throwable) { |
| return future.handle( |
| (prevValue, prevThrowable) -> |
| IgniteStringFormatter.format( |
| "Different values associated with the token [token={}, value={}, exception={}, prevValue={}, prevException={}]", |
| token, |
| value, |
| throwable, |
| prevValue, |
| prevThrowable |
| ) |
| ) |
| .join(); |
| } |
| |
| /** |
| * Updates the value using the given updater. The updater receives the value on previous token, or default value |
| * (see constructor) if the value isn't initialized, or current intermediate value, if this method has been already |
| * called for the same token; and returns a new value.<br> |
| * The updater will be called after updaters that had been passed to previous calls of this method complete. |
| * If an exception ({@link CancellationException} or {@link CompletionException}) was thrown when calculating the value for previous |
| * token, then updater is used to process the exception and calculate a new value.<br> |
| * This method can be called multiple times for the same token, and doesn't complete the future created for this token. |
| * The future is supposed to be completed by storage revision update or a call of {@link #complete(long)} in this case. |
| * If this method has been called at least once on the given token, the updater will receive a value that was evaluated |
| * by updater on previous call, as intermediate result.<br> |
| * As the order of multiple calls of this method on the same token is unknown, operations done by the updater must be |
| * commutative. For example: |
| * <ul> |
| * <li>this method was called for token N-1 and updater evaluated the value V1;</li> |
| * <li>a storage revision update happened;</li> |
| * <li>this method is called for token N, updater receives V1 and evaluates V2;</li> |
| * <li>this method is called once again for token N, then the updater receives V2 as intermediate result and evaluates V3;</li> |
| * <li>storage revision update happens and the future for token N completes with value V3.</li> |
| * </ul> |
| * Regardless of order in which this method's calls are made, V3 should be the final result. |
| * <br> |
| * The method should return a future that will be completed when {@code updater} completes. |
| * |
| * @param causalityToken Causality token. |
| * @param updater The binary function that accepts previous value and exception, if present, and update it to compute |
| * the new value. |
| * @return Future for updated value. |
| */ |
| public CompletableFuture<T> update( |
| long causalityToken, |
| BiFunction<T, Throwable, CompletableFuture<T>> updater |
| ) { |
| long actualToken0 = this.actualToken; |
| |
| checkToken(actualToken0, causalityToken); |
| |
| synchronized (updateMutex) { |
| CompletableFuture<T> updaterFuture = this.updaterFuture; |
| |
| CompletableFuture<T> future = updaterFuture == null ? previousOrDefaultValueFuture(actualToken0) : updaterFuture; |
| |
| CompletableFuture<CompletableFuture<T>> f0 = future |
| .handle(updater::apply) |
| .handle((fut, e) -> e == null ? fut : failedFuture(e)); |
| |
| updaterFuture = f0.thenCompose(Function.identity()); |
| |
| this.updaterFuture = updaterFuture; |
| |
| return updaterFuture; |
| } |
| } |
| |
| /** |
| * Add listener for completions of this versioned value on every token. It will be called on every {@link #complete(long)}, |
| * {@link #complete(long, Object)}, {@link #completeExceptionally(long, Throwable)} and also, if none of mentioned methods was |
| * called explicitly, on storage revision update, |
| * |
| * @param action Action to perform. |
| */ |
| public void whenComplete(IgniteTriConsumer<Long, T, Throwable> action) { |
| completionListeners.add(action); |
| } |
| |
| /** |
| * Removes a completion listener, see {@link #whenComplete(IgniteTriConsumer)}. |
| * |
| * @param action Action to remove. |
| */ |
| public void removeWhenComplete(IgniteTriConsumer<Long, T, Throwable> action) { |
| completionListeners.remove(action); |
| } |
| |
| /** |
| * Notify completion listeners. |
| * |
| * @param causalityToken Token. |
| * @param value Value. |
| * @param throwable Throwable. |
| */ |
| private void notifyCompletionListeners(long causalityToken, T value, Throwable throwable) { |
| Throwable unpackedThrowable = throwable instanceof CompletionException ? throwable.getCause() : throwable; |
| |
| List<Exception> exceptions = new ArrayList<>(); |
| |
| for (IgniteTriConsumer<Long, T, Throwable> listener : completionListeners) { |
| try { |
| listener.accept(causalityToken, value, unpackedThrowable); |
| } catch (Exception e) { |
| exceptions.add(e); |
| } |
| } |
| |
| if (!exceptions.isEmpty()) { |
| IgniteInternalException ex = new IgniteInternalException(); |
| |
| exceptions.forEach(ex::addSuppressed); |
| |
| throw ex; |
| } |
| } |
| |
| /** |
| * Complete because of explicit token update. This also triggers completion of a future created for the given causality token. |
| * This future completes after all updaters are complete (see {@link #update(long, BiFunction)}). |
| * |
| * @param causalityToken Token. |
| * @return Future. |
| */ |
| private CompletableFuture<?> completeOnRevision(long causalityToken) { |
| long actualToken0 = actualToken; |
| |
| assert causalityToken > actualToken0 : IgniteStringFormatter.format( |
| "New token should be greater than current [current={}, new={}]", actualToken0, causalityToken); |
| |
| if (actualToken0 == NOT_INITIALIZED) { |
| history.put(causalityToken, initFut); |
| } |
| |
| synchronized (updateMutex) { |
| CompletableFuture<T> updaterFuture0 = updaterFuture; |
| |
| CompletableFuture<?> completeUpdatesFuture = updaterFuture0 == null |
| ? completedFuture(null) |
| : updaterFuture0.whenComplete((v, t) -> completeInternal(causalityToken, v, t)); |
| |
| updaterFuture = null; |
| |
| actualToken = causalityToken; |
| |
| return completeUpdatesFuture.thenRun(() -> { |
| completeRelatedFuture(causalityToken); |
| |
| if (history.size() > 1 && causalityToken - history.firstKey() >= historySize) { |
| trimToSize(causalityToken); |
| } |
| |
| Entry<Long, CompletableFuture<T>> entry = history.floorEntry(causalityToken); |
| |
| assert entry != null && entry.getValue().isDone() : IgniteStringFormatter.format( |
| "Future for the token is not completed [token={}]", causalityToken); |
| }); |
| } |
| } |
| |
| /** |
| * Completes a future related with a specific causality token. It is called only on storage revision update. |
| * |
| * @param causalityToken The token which is becoming an actual. |
| */ |
| private void completeRelatedFuture(long causalityToken) { |
| Entry<Long, CompletableFuture<T>> entry = history.floorEntry(causalityToken); |
| |
| CompletableFuture<T> future = entry.getValue(); |
| |
| if (!future.isDone()) { |
| Entry<Long, CompletableFuture<T>> entryBefore = history.headMap(causalityToken).lastEntry(); |
| |
| CompletableFuture<T> previousFuture = entryBefore == null ? completedFuture(getDefault()) : entryBefore.getValue(); |
| |
| assert previousFuture.isDone() : IgniteStringFormatter.format("No future for token [token={}]", causalityToken); |
| |
| previousFuture.whenComplete((t, throwable) -> { |
| if (throwable != null) { |
| future.completeExceptionally(throwable); |
| |
| notifyCompletionListeners(causalityToken, null, throwable); |
| } else { |
| future.complete(t); |
| |
| notifyCompletionListeners(causalityToken, t, null); |
| } |
| }); |
| } else if (entry.getKey() < causalityToken) { |
| // Notifying listeners when there were neither updates nor explicit completions. |
| // This future is previous, it is always done. |
| future.whenComplete((v, e) -> notifyCompletionListeners(causalityToken, v, e)); |
| } |
| } |
| |
| /** |
| * Return a future for previous or default value. |
| * |
| * @param actualToken Token. |
| * @return Future. |
| */ |
| private CompletableFuture<T> previousOrDefaultValueFuture(long actualToken) { |
| Entry<Long, CompletableFuture<T>> histEntry = history.floorEntry(actualToken); |
| |
| if (histEntry == null) { |
| return completedFuture(getDefault()); |
| } else { |
| CompletableFuture<T> prevFuture = histEntry.getValue(); |
| |
| assert prevFuture.isDone() : "Previous value should be ready."; |
| |
| return prevFuture; |
| } |
| } |
| |
| /** |
| * Trims the storage to history size. |
| * |
| * @param causalityToken Last token which is being applied. |
| */ |
| private void trimToSize(long causalityToken) { |
| Long lastToken = history.lastKey(); |
| |
| trimHistoryLock.writeLock().lock(); |
| |
| try { |
| for (Long token : history.keySet()) { |
| if (!token.equals(lastToken) && causalityToken - token >= historySize) { |
| history.remove(token); |
| } |
| } |
| } finally { |
| trimHistoryLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Check that the given causality token os correct according to the actual token. |
| * |
| * @param actualToken Actual token. |
| * @param candidateToken Candidate token. |
| */ |
| private static void checkToken(long actualToken, long candidateToken) { |
| assert actualToken == NOT_INITIALIZED || actualToken < candidateToken : IgniteStringFormatter.format( |
| "Token must be greater than actual [token={}, actual={}]", candidateToken, actualToken); |
| } |
| } |