blob: fd1b3729eeb0f1e9906d327bb1a39eff0533befb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.transform.DuplicateRowChecker;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import static com.google.common.collect.Iterables.all;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
*
* Optionally, may perform additional requests to provide redundancy against replica failure:
* AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while
* SpeculatingReadExecutor will wait until it looks like the original request is in danger
* of timing out before performing extra reads.
*/
public abstract class AbstractReadExecutor
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
protected final ReadCommand command;
private final ReplicaPlan.SharedForTokenRead replicaPlan;
protected final ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> readRepair;
protected final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> digestResolver;
protected final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler;
protected final TraceState traceState;
protected final ColumnFamilyStore cfs;
protected final long queryStartNanoTime;
private final int initialDataRequestCount;
protected volatile PartitionIterator result = null;
AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, int initialDataRequestCount, long queryStartNanoTime)
{
this.command = command;
this.replicaPlan = ReplicaPlan.shared(replicaPlan);
this.initialDataRequestCount = initialDataRequestCount;
// the ReadRepair and DigestResolver both need to see our updated
this.readRepair = ReadRepair.create(command, this.replicaPlan, queryStartNanoTime);
this.digestResolver = new DigestResolver<>(command, this.replicaPlan, queryStartNanoTime);
this.handler = new ReadCallback<>(digestResolver, command, this.replicaPlan, queryStartNanoTime);
this.cfs = cfs;
this.traceState = Tracing.instance.get();
this.queryStartNanoTime = queryStartNanoTime;
// Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
// knows how to produce older digest but the reverse is not true.
// TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
// we stop being compatible with pre-3.0 nodes.
int digestVersion = MessagingService.current_version;
for (Replica replica : replicaPlan.contacts())
digestVersion = Math.min(digestVersion, MessagingService.instance().versions.get(replica.endpoint()));
command.setDigestVersion(digestVersion);
}
public DecoratedKey getKey()
{
Preconditions.checkState(command instanceof SinglePartitionReadCommand,
"Can only get keys for SinglePartitionReadCommand");
return ((SinglePartitionReadCommand) command).partitionKey();
}
public ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> getReadRepair()
{
return readRepair;
}
protected void makeFullDataRequests(ReplicaCollection<?> replicas)
{
assert all(replicas, Replica::isFull);
makeRequests(command, replicas);
}
protected void makeTransientDataRequests(Iterable<Replica> replicas)
{
makeRequests(command.copyAsTransientQuery(replicas), replicas);
}
protected void makeDigestRequests(Iterable<Replica> replicas)
{
assert all(replicas, Replica::isFull);
// only send digest requests to full replicas, send data requests instead to the transient replicas
makeRequests(command.copyAsDigestQuery(replicas), replicas);
}
private void makeRequests(ReadCommand readCommand, Iterable<Replica> replicas)
{
boolean hasLocalEndpoint = false;
Message<ReadCommand> message = null;
for (Replica replica: replicas)
{
assert replica.isFull() || readCommand.acceptsTransient();
InetAddressAndPort endpoint = replica.endpoint();
if (replica.isSelf())
{
hasLocalEndpoint = true;
continue;
}
if (traceState != null)
traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
if (null == message)
message = readCommand.createMessage(false);
MessagingService.instance().sendWithCallback(message, endpoint, handler);
}
// We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
if (hasLocalEndpoint)
{
logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
Stage.READ.maybeExecuteImmediately(new LocalReadRunnable(readCommand, handler));
}
}
/**
* Perform additional requests if it looks like the original will time out. May block while it waits
* to see if the original requests are answered first.
*/
public abstract void maybeTryAdditionalReplicas();
/**
* send the initial set of requests
*/
public void executeAsync()
{
EndpointsForToken selected = replicaPlan().contacts();
EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount);
makeFullDataRequests(fullDataRequests);
makeTransientDataRequests(selected.filterLazily(Replica::isTransient));
makeDigestRequests(selected.filterLazily(r -> r.isFull() && !fullDataRequests.contains(r)));
}
/**
* @return an executor appropriate for the configured speculative read policy
*/
public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
{
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
// Speculative retry is disabled *OR*
// 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false);
// There are simply no extra replicas to speculate.
// Handle this separately so it can record failed attempts to speculate due to lack of replicas
if (replicaPlan.contacts().size() == replicaPlan.candidates().size())
{
boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation);
}
if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
return new AlwaysSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
else // PERCENTILE or CUSTOM.
return new SpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
}
/**
* Returns true if speculation should occur and if it should then block until it is time to
* send the speculative reads
*/
boolean shouldSpeculateAndMaybeWait()
{
// no latency information, or we're overloaded
if (cfs.sampleReadLatencyNanos > command.getTimeout(NANOSECONDS))
return false;
return !handler.await(cfs.sampleReadLatencyNanos, NANOSECONDS);
}
ReplicaPlan.ForTokenRead replicaPlan()
{
return replicaPlan.get();
}
void onReadTimeout() {}
public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
{
/**
* If never speculating due to lack of replicas
* log it is as a failure if it should have happened
* but couldn't due to lack of replicas
*/
private final boolean logFailedSpeculation;
public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime, boolean logFailedSpeculation)
{
super(cfs, command, replicaPlan, 1, queryStartNanoTime);
this.logFailedSpeculation = logFailedSpeculation;
}
public void maybeTryAdditionalReplicas()
{
if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
{
cfs.metric.speculativeInsufficientReplicas.inc();
}
}
}
static class SpeculatingReadExecutor extends AbstractReadExecutor
{
private volatile boolean speculated = false;
public SpeculatingReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
ReplicaPlan.ForTokenRead replicaPlan,
long queryStartNanoTime)
{
// We're hitting additional targets for read repair (??). Since our "extra" replica is the least-
// preferred by the snitch, we do an extra data read to start with against a replica more
// likely to respond; better to let RR fail than the entire query.
super(cfs, command, replicaPlan, replicaPlan.blockFor() < replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime);
}
public void maybeTryAdditionalReplicas()
{
if (shouldSpeculateAndMaybeWait())
{
//Handle speculation stats first in case the callback fires immediately
cfs.metric.speculativeRetries.inc();
speculated = true;
ReplicaPlan.ForTokenRead replicaPlan = replicaPlan();
ReadCommand retryCommand;
Replica extraReplica;
if (handler.resolver.isDataPresent())
{
extraReplica = replicaPlan.firstUncontactedCandidate(replica -> true);
// we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
assert extraReplica != null;
retryCommand = extraReplica.isTransient()
? command.copyAsTransientQuery(extraReplica)
: command.copyAsDigestQuery(extraReplica);
}
else
{
extraReplica = replicaPlan.firstUncontactedCandidate(Replica::isFull);
retryCommand = command;
if (extraReplica == null)
{
cfs.metric.speculativeInsufficientReplicas.inc();
// cannot safely speculate a new data request, without more work - requests assumed to be
// unique per endpoint, and we have no full nodes left to speculate against
return;
}
}
// we must update the plan to include this new node, else when we come to read-repair, we may not include this
// speculated response in the data requests we make again, and we will not be able to 'speculate' an extra repair read,
// nor would we be able to speculate a new 'write' if the repair writes are insufficient
super.replicaPlan.addToContacts(extraReplica);
if (traceState != null)
traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
MessagingService.instance().sendWithCallback(retryCommand.createMessage(false), extraReplica.endpoint(), handler);
}
}
@Override
void onReadTimeout()
{
//Shouldn't be possible to get here without first attempting to speculate even if the
//timing is bad
assert speculated;
cfs.metric.speculativeFailedRetries.inc();
}
}
private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
{
public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
ReplicaPlan.ForTokenRead replicaPlan,
long queryStartNanoTime)
{
// presumably, we speculate an extra data request here in case it is our data request that fails to respond,
// and there are no more nodes to consult
super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 ? 2 : 1, queryStartNanoTime);
}
public void maybeTryAdditionalReplicas()
{
// no-op
}
@Override
public void executeAsync()
{
super.executeAsync();
cfs.metric.speculativeRetries.inc();
}
@Override
void onReadTimeout()
{
cfs.metric.speculativeFailedRetries.inc();
}
}
public void setResult(PartitionIterator result)
{
Preconditions.checkState(this.result == null, "Result can only be set once");
this.result = DuplicateRowChecker.duringRead(result, this.replicaPlan.get().candidates().endpointList());
}
/**
* Wait for the CL to be satisfied by responses
*/
public void awaitResponses() throws ReadTimeoutException
{
try
{
handler.awaitResults();
assert digestResolver.isDataPresent() : "awaitResults returned with no data present.";
}
catch (ReadTimeoutException e)
{
try
{
onReadTimeout();
}
finally
{
throw e;
}
}
// return immediately, or begin a read repair
if (digestResolver.responsesMatch())
{
setResult(digestResolver.getData());
}
else
{
Tracing.trace("Digest mismatch: Mismatch for key {}", getKey());
readRepair.startRepair(digestResolver, this::setResult);
}
}
public void awaitReadRepair() throws ReadTimeoutException
{
try
{
readRepair.awaitReads();
}
catch (ReadTimeoutException e)
{
if (Tracing.isTracing())
Tracing.trace("Timed out waiting on digest mismatch repair requests");
else
logger.trace("Timed out waiting on digest mismatch repair requests");
// the caught exception here will have CL.ALL from the repair command,
// not whatever CL the initial command was at (CASSANDRA-7947)
throw new ReadTimeoutException(replicaPlan().consistencyLevel(), handler.blockFor - 1, handler.blockFor, true);
}
}
boolean isDone()
{
return result != null;
}
public void maybeSendAdditionalDataRequests()
{
if (isDone())
return;
readRepair.maybeSendAdditionalReads();
}
public PartitionIterator getResult() throws ReadFailureException, ReadTimeoutException
{
Preconditions.checkState(result != null, "Result must be set first");
return result;
}
}