blob: 91d9370f8f06c2e9f37c023575f179743fcf6faf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements RequestCallback<ReadResponse>
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
public final ResponseResolver<E, P> resolver;
final SimpleCondition condition = new SimpleCondition();
private final long queryStartNanoTime;
final int blockFor; // TODO: move to replica plan as well?
// this uses a plain reference, but is initialised before handoff to any other threads; the later updates
// may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object
final ReplicaPlan.Shared<E, P> replicaPlan;
private final ReadCommand command;
private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
= AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
this.command = command;
this.resolver = resolver;
this.queryStartNanoTime = queryStartNanoTime;
this.replicaPlan = replicaPlan;
this.blockFor = replicaPlan.get().blockFor();
this.failureReasonByEndpoint = new ConcurrentHashMap<>();
// we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
assert !(command instanceof PartitionRangeReadCommand) || blockFor >= replicaPlan().contacts().size();
if (logger.isTraceEnabled())
logger.trace("Blockfor is {}; setting up requests to {}", blockFor, this.replicaPlan);
}
protected P replicaPlan()
{
return replicaPlan.get();
}
public boolean await(long timePastStart, TimeUnit unit)
{
long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime);
try
{
return condition.await(time, TimeUnit.NANOSECONDS);
}
catch (InterruptedException ex)
{
throw new AssertionError(ex);
}
}
public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
/**
* Here we are checking isDataPresent in addition to the responses size because there is a possibility
* that an asynchronous speculative execution request could be returning after a local failure already
* signaled. Responses may have been set while the data reference is not yet.
* See {@link DigestResolver#preprocess(Message)}
* CASSANDRA-16097
*/
int received = resolver.responses.size();
boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent());
if (signaled && !failed)
return;
if (Tracing.isTracing())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
}
else if (logger.isDebugEnabled())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
}
// Same as for writes, see AbstractWriteResponseHandler
throw failed
? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
: new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent());
}
public int blockFor()
{
return blockFor;
}
@Override
public void onResponse(Message<ReadResponse> message)
{
assertWaitingFor(message.from());
resolver.preprocess(message);
/*
* Ensure that data is present and the response accumulator has properly published the
* responses it has received. This may result in not signaling immediately when we receive
* the minimum number of required results, but it guarantees at least the minimum will
* be accessible when we do signal. (see CASSANDRA-16807)
*/
if (resolver.isDataPresent() && resolver.responses.size() >= blockFor)
condition.signalAll();
}
public void response(ReadResponse result)
{
Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
Message<ReadResponse> message = Message.internalResponse(kind, result);
onResponse(message);
}
@Override
public boolean trackLatencyForSnitch()
{
return true;
}
@Override
public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
{
assertWaitingFor(from);
failureReasonByEndpoint.put(from, failureReason);
if (blockFor + failuresUpdater.incrementAndGet(this) > replicaPlan().contacts().size())
condition.signalAll();
}
@Override
public boolean invokeOnFailure()
{
return true;
}
/**
* Verify that a message doesn't come from an unexpected replica.
*/
private void assertWaitingFor(InetAddressAndPort from)
{
assert !replicaPlan().consistencyLevel().isDatacenterLocal()
|| DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
: "Received read response from unexpected replica: " + from;
}
}