blob: 3753b9334df596d6c718b1622fc2dc724498be2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hints;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.service.PreserveTimestamp;
import org.apache.cassandra.service.accord.IAccordService.IAccordResult;
import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper;
import org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutation;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.Condition;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.cassandra.hints.HintsDispatcher.Callback.Outcome.FAILURE;
import static org.apache.cassandra.hints.HintsDispatcher.Callback.Outcome.INTERRUPTED;
import static org.apache.cassandra.hints.HintsDispatcher.Callback.Outcome.RETRY_DIFFERENT_SYSTEM;
import static org.apache.cassandra.hints.HintsDispatcher.Callback.Outcome.SUCCESS;
import static org.apache.cassandra.hints.HintsDispatcher.Callback.Outcome.TIMEOUT;
import static org.apache.cassandra.hints.HintsService.RETRY_ON_DIFFERENT_SYSTEM_UUID;
import static org.apache.cassandra.metrics.HintsServiceMetrics.ACCORD_HINT_ENDPOINT;
import static org.apache.cassandra.metrics.HintsServiceMetrics.updateDelayMetrics;
import static org.apache.cassandra.net.Verb.HINT_REQ;
import static org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_protocol;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
/**
* Dispatches a single hints file to a specified node in a batched manner.
*
* Uses either {@link HintMessage.Encoded} - when dispatching hints into a node with the same messaging version as the hints file,
* or {@link HintMessage}, when conversion is required.
*/
final class HintsDispatcher implements AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(HintsDispatcher.class);
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
private enum Action { CONTINUE, ABORT }
private final HintsReader reader;
final UUID hostId;
@Nullable
final InetAddressAndPort address;
private final int messagingVersion;
private final BooleanSupplier abortRequested;
private InputPosition currentPagePosition;
// Hints from the batch log that were attempted on Accord don't have a list of hosts that need hinting
// since Accord doesn't expose that on failure. If Accord no longer manages the range for this hint then we need
// to send the hint to all replicas after the page succeeds
private final Queue<Hint> hintsNeedingRehinting = new LinkedList<>();
private HintsDispatcher(HintsReader reader, UUID hostId, @Nullable InetAddressAndPort address, int messagingVersion, BooleanSupplier abortRequested)
{
checkArgument(address != null ^ hostId.equals(RETRY_ON_DIFFERENT_SYSTEM_UUID), "address must be nonnull or hostId must be " + RETRY_ON_DIFFERENT_SYSTEM_UUID);
currentPagePosition = null;
this.reader = reader;
this.hostId = hostId;
this.address = address;
this.messagingVersion = messagingVersion;
this.abortRequested = abortRequested;
}
static HintsDispatcher create(File file, RateLimiter rateLimiter, @Nullable InetAddressAndPort address, UUID hostId, BooleanSupplier abortRequested)
{
int messagingVersion = address == null ? MessagingService.current_version : MessagingService.instance().versions.get(address);
HintsDispatcher dispatcher = new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested);
HintDiagnostics.dispatcherCreated(dispatcher);
return dispatcher;
}
public void close()
{
HintDiagnostics.dispatcherClosed(this);
reader.close();
}
String destination()
{
return address == null ? "RETRY_ON_DIFFERENT_SYSTEM" : address.toString();
}
void seek(InputPosition position)
{
reader.seek(position);
}
/**
* @return whether or not dispatch completed entirely and successfully
*/
boolean dispatch()
{
for (HintsReader.Page page : reader)
{
currentPagePosition = page.position;
if (dispatch(page) != Action.CONTINUE)
return false;
}
return true;
}
/**
* @return offset of the first non-delivered page
*/
InputPosition dispatchPosition()
{
return currentPagePosition;
}
// retry in case of a timeout; stop in case of a failure, host going down, or delivery paused
private Action dispatch(HintsReader.Page page)
{
HintDiagnostics.dispatchPage(this);
return sendHintsAndAwait(page);
}
private Action sendHintsAndAwait(HintsReader.Page page)
{
try
{
return doSendHintsAndAwait(page, null);
}
finally
{
hintsNeedingRehinting.clear();
}
}
private Action doSendHintsAndAwait(HintsReader.Page page, @Nullable BitSet hintsFilter)
{
List<Callback> callbacks = new ArrayList<>();
/*
* If hints file messaging version matches the version of the target host, we'll use the optimised path -
* skipping the redundant decoding/encoding cycle of the already encoded hint.
*
* If that is not the case, we'll need to perform conversion to a newer (or an older) format, and decoding the hint
* is an unavoidable intermediate step.
*
* If these hints are from the batchlog and were originally attempted on Accord then
* we also need to decode so we can route the Hint contents appropriately.
*
* If filtering of hints is requested, because this is retrying a page that had some retry on different system
* errors, then also don't go down the sendEncodedHints path since it won't re-route the mutation and will trigger
* the same retry on different system error.
*/
boolean isBatchLogHints = hostId.equals(RETRY_ON_DIFFERENT_SYSTEM_UUID);
boolean sendEncodedHints = reader.descriptor().messagingVersion() == messagingVersion && !isBatchLogHints && hintsFilter == null;
// If the hints filter is set then splitting the hints is needed and encoded hints can't do that
checkState(!sendEncodedHints || hintsFilter == null, "Should not send encoded hints if hints filter is set");
Action action = sendEncodedHints
? sendHints(page.buffersIterator(), null, callbacks, this::sendEncodedHint)
: sendHints(page.hintsIterator(), hintsFilter, callbacks, this::sendHint);
if (action == Action.ABORT)
return action;
BitSet retryDifferentSystemHints = new BitSet(callbacks.size());
long success = 0, failures = 0, timeouts = 0, retryDifferentSystem = 0;
for (int i = 0; i < callbacks.size(); i++)
{
Callback cb = callbacks.get(i);
Callback.Outcome outcome = cb.await();
if (outcome == Callback.Outcome.SUCCESS) success++;
else if (outcome == Callback.Outcome.FAILURE) failures++;
else if (outcome == Callback.Outcome.TIMEOUT) timeouts++;
else if (outcome == RETRY_DIFFERENT_SYSTEM)
{
retryDifferentSystemHints.set(i);
retryDifferentSystem++;
}
else throw new IllegalStateException("Unhandled outcome: " + outcome);
}
updateMetrics(success, failures, timeouts, retryDifferentSystem);
// If the only errors were retryDifferentSystem and we aren't already filtering the hints then retry
// immediately otherwise we will repeat the page later including any successful hints we may have already delivered
// Hints for the batch log can hit RETRY_DIFFERENT_SYSTEM but don't need to be retried here and it could result
// in the same hint ending up in hintsNeedingRehinting twice
boolean failedRetryDifferentSystem = false;
if (retryDifferentSystem > 0 && failures < 1 && timeouts < 1 && hintsFilter == null && !isBatchLogHints)
{
reader.seek(currentPagePosition);
Action retryResult = doSendHintsAndAwait(page, retryDifferentSystemHints);
if (retryResult != Action.CONTINUE)
failedRetryDifferentSystem = true;
}
// The batchlog Accord hints need to return abort if any hint needs to be retried and retry the whole page
// since we don't want hints to ping pong back and forth via hintsNeedingRehinting
if (failures > 0 || timeouts > 0 || failedRetryDifferentSystem || (isBatchLogHints && retryDifferentSystem > 0))
{
HintDiagnostics.pageFailureResult(this, success, failures, timeouts, retryDifferentSystem);
return Action.ABORT;
}
else
{
HintDiagnostics.pageSuccessResult(this, success, failures, timeouts, retryDifferentSystem);
rehintHintsNeedingRehinting();
return Action.CONTINUE;
}
}
private void rehintHintsNeedingRehinting()
{
ClusterMetadata cm = ClusterMetadata.current();
Hint hint;
while ((hint = hintsNeedingRehinting.poll()) != null)
{
HintsService.instance.writeForAllReplicas(hint);
Mutation mutation = hint.mutation;
// Also may need to apply locally because it's possible this is from the batchlog
// and we never applied it locally
// TODO (review): Additional error handling necessary? Hints are lossy
DataPlacement dataPlacement = cm.placements.get(cm.schema.getKeyspace(mutation.getKeyspaceName()).getMetadata().params.replication);
VersionedEndpoints.ForToken forToken = dataPlacement.writes.forToken(mutation.key().getToken());
Replica self = forToken.get().selfIfPresent();
if (self != null)
{
Stage.MUTATION.maybeExecuteImmediately(new RunnableDebuggableTask()
{
private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
private volatile long approxStartTimeNanos;
@Override
public void run()
{
approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
mutation.apply();
}
@Override
public long creationTimeNanos()
{
return approxCreationTimeNanos;
}
@Override
public long startTimeNanos()
{
return approxStartTimeNanos;
}
@Override
public String description()
{
return "HintsService rehinting Accord txn";
}
});
}
}
}
private void updateMetrics(long success, long failures, long timeouts, long retryDifferentSystem)
{
HintsServiceMetrics.hintsSucceeded.mark(success);
HintsServiceMetrics.hintsFailed.mark(failures);
HintsServiceMetrics.hintsTimedOut.mark(timeouts);
HintsServiceMetrics.hintsRetryDifferentSystem.mark(retryDifferentSystem);
}
/*
* Sending hints in compatibility mode.
*/
private <T> Action sendHints(Iterator<T> hints, @Nullable BitSet hintsFilter, Collection<Callback> callbacks, Function<T, Callback> sendFunction)
{
int hintIndex = -1;
while (hints.hasNext())
{
if (abortRequested.getAsBoolean())
{
HintDiagnostics.abortRequested(this);
return Action.ABORT;
}
T hint = hints.next();
hintIndex++;
if (hintsFilter != null && !hintsFilter.get(hintIndex))
continue;
callbacks.add(sendFunction.apply(hint));
}
return Action.CONTINUE;
}
private Callback sendHint(Hint hint)
{
ClusterMetadata cm = ClusterMetadata.current();
SplitHint splitHint = splitHintIntoAccordAndNormal(cm, hint);
Mutation accordHintMutation = splitHint.accordMutation;
Dispatcher.RequestTime requestTime;
IAccordResult<TxnResult> accordTxnResult = null;
if (accordHintMutation != null)
{
requestTime = Dispatcher.RequestTime.forImmediateExecution();
accordTxnResult = accordHintMutation != null ? ConsensusMigrationMutationHelper.instance().mutateWithAccordAsync(cm, accordHintMutation, null, requestTime, PreserveTimestamp.yes) : null;
}
Hint normalHint = splitHint.normalHint;
Callback callback = new Callback(address, hint.creationTime, accordTxnResult);
if (normalHint != null)
{
// We had a hint that was supposed to be done on Accord for the batch log (otherwise address would be non-null),
// but Accord no longer manages that table/range and now we don't know which nodes (if any) are missing the Mutation.
// Convert them to per replica hints *after* all the hints in this page have been applied so we can be reasonably sure
// this page isn't going to be played again thus avoiding any futher amplification from the same hint being
// replayed and repeatedly converted to per replica hints
if (address == null)
{
checkState(hostId.equals(RETRY_ON_DIFFERENT_SYSTEM_UUID), "If there is no address to send the hint to then the host ID should be BATCHLOG_ACCORD_HINT_UUID");
callback.onResponse(null);
hintsNeedingRehinting.add(normalHint);
}
else
{
Message<?> message = Message.out(HINT_REQ, new HintMessage(hostId, normalHint));
MessagingService.instance().sendWithCallback(message, address, callback);
}
}
else
{
// Don't wait for a normal response that will never come since no hints were sent
callback.onResponse(null);
}
return callback;
}
/**
* Result of splitting a hint across Accord and non-transactional boundaries
*/
private class SplitHint
{
private final Mutation accordMutation;
private final Hint normalHint;
public SplitHint(Mutation accordMutation, Hint normalHint)
{
this.accordMutation = accordMutation;
this.normalHint = normalHint;
}
@Override
public String toString()
{
return "SplitHint{" +
"accordMutation=" + accordMutation +
", normalHint=" + normalHint +
'}';
}
}
private SplitHint splitHintIntoAccordAndNormal(ClusterMetadata cm, Hint hint)
{
SplitMutation<Mutation> splitMutation = ConsensusMigrationMutationHelper.instance().splitMutationIntoAccordAndNormal(hint.mutation, cm);
if (splitMutation.accordMutation == null)
return new SplitHint(null, hint);
if (splitMutation.normalMutation == null)
return new SplitHint(splitMutation.accordMutation, null);
Hint normalHint = Hint.create(splitMutation.normalMutation, hint.creationTime, splitMutation.normalMutation.smallestGCGS());
return new SplitHint(splitMutation.accordMutation, normalHint);
}
/*
* Sending hints in raw mode.
*/
private Callback sendEncodedHint(ByteBuffer hint)
{
HintMessage.Encoded message = new HintMessage.Encoded(hostId, hint, messagingVersion);
Callback callback = new Callback(address, message.getHintCreationTime());
MessagingService.instance().sendWithCallback(Message.out(HINT_REQ, message), address, callback);
return callback;
}
static final class Callback implements RequestCallback, BiConsumer<TxnResult, Throwable>
{
enum Outcome { SUCCESS, TIMEOUT, FAILURE, INTERRUPTED, RETRY_DIFFERENT_SYSTEM }
private final long start = approxTime.now();
private final Condition condition = newOneTimeCondition();
private Outcome normalOutcome;
private Outcome accordOutcome;
@Nullable
private final InetAddressAndPort to;
private final long hintCreationNanoTime;
private Callback(@Nonnull InetAddressAndPort to, long hintCreationTimeMillisSinceEpoch)
{
this(to, hintCreationTimeMillisSinceEpoch, null);
}
private Callback(@Nullable InetAddressAndPort to, long hintCreationTimeMillisSinceEpoch, @Nullable IAccordResult<TxnResult> accordTxnResult)
{
this.to = to != null ? to : ACCORD_HINT_ENDPOINT;
this.hintCreationNanoTime = approxTime.translate().fromMillisSinceEpoch(hintCreationTimeMillisSinceEpoch);
if (accordTxnResult != null)
accordTxnResult.addCallback(this);
else
accordOutcome = SUCCESS;
}
Outcome await()
{
boolean timedOut;
try
{
timedOut = !condition.awaitUntil(HINT_REQ.expiresAtNanos(start));
}
catch (InterruptedException e)
{
logger.warn("Hint dispatch was interrupted", e);
return INTERRUPTED;
}
normalOutcome = timedOut ? TIMEOUT : normalOutcome;
return outcome();
}
private Outcome outcome()
{
checkState((normalOutcome != null && accordOutcome != null) || (normalOutcome != SUCCESS || accordOutcome != SUCCESS), "Outcome for both normal and accord hint delivery should be known");
if (normalOutcome == RETRY_DIFFERENT_SYSTEM || accordOutcome == RETRY_DIFFERENT_SYSTEM)
return RETRY_DIFFERENT_SYSTEM;
if (normalOutcome == TIMEOUT || accordOutcome == TIMEOUT)
return TIMEOUT;
if (normalOutcome == FAILURE || accordOutcome == FAILURE)
return FAILURE;
checkState(normalOutcome == SUCCESS && accordOutcome == SUCCESS, "Hint delivery should have been successful");
return SUCCESS;
}
private synchronized void maybeSignal()
{
if ((normalOutcome != null && accordOutcome != null) || normalOutcome == FAILURE || accordOutcome == FAILURE)
{
updateDelayMetrics(to, approxTime.now() - this.hintCreationNanoTime);
condition.signalAll();
}
}
@Override
public boolean invokeOnFailure()
{
return true;
}
@Override
public void onFailure(InetAddressAndPort from, RequestFailure failureMessage)
{
if (failureMessage.reason == RequestFailureReason.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM)
normalOutcome = RETRY_DIFFERENT_SYSTEM;
else
normalOutcome = FAILURE;
maybeSignal();
}
@Override
public void onResponse(Message msg)
{
normalOutcome = SUCCESS;
maybeSignal();
}
@Override
public void accept(TxnResult success, Throwable fail)
{
if (fail != null)
{
if (fail instanceof RequestExecutionException || fail instanceof RetryOnDifferentSystemException)
{
if (fail instanceof RetryOnDifferentSystemException)
accordOutcome = RETRY_DIFFERENT_SYSTEM;
else
accordOutcome = TIMEOUT;
String msg = "Accord hint delivery transaction failed retriably";
if (noSpamLogger.getStatement(msg).shouldLog(Clock.Global.nanoTime()))
logger.error(msg, fail);
}
else
{
accordOutcome = FAILURE;
String msg = "Accord hint delivery transaction failed permanently";
if (noSpamLogger.getStatement(msg).shouldLog(Clock.Global.nanoTime()))
logger.error(msg, fail);
}
}
else
{
if (success.kind() == retry_new_protocol)
accordOutcome = RETRY_DIFFERENT_SYSTEM;
else
accordOutcome = SUCCESS;
}
maybeSignal();
}
}
}