blob: 19aff7428a02b0eb5bfee2d2ba6c44d68c64cfc7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.service.paxos.Commit.Agreed;
import org.apache.cassandra.service.paxos.Commit.Committed;
import org.apache.cassandra.tracing.Tracing;
import static org.apache.cassandra.exceptions.RequestFailureReason.TIMEOUT;
import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
import static org.apache.cassandra.net.Verb.PAXOS2_PREPARE_REFRESH_REQ;
import static org.apache.cassandra.service.paxos.Commit.isAfter;
import static org.apache.cassandra.service.paxos.PaxosRequestCallback.shouldExecuteOnSelf;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
/**
* Nodes that have promised in response to our prepare, may be missing the latestCommit, meaning we cannot be sure the
* prior round has been committed to the necessary quorum of participants, so that it will be visible to future quorums.
*
* To resolve this problem, we submit the latest commit we have seen, and wait for confirmation before continuing
* (verifying that we are still promised in the process).
*/
public class PaxosPrepareRefresh implements RequestCallbackWithFailure<PaxosPrepareRefresh.Response>
{
private static final Logger logger = LoggerFactory.getLogger(PaxosPrepareRefresh.class);
public static final RequestHandler requestHandler = new RequestHandler();
public static final RequestSerializer requestSerializer = new RequestSerializer();
public static final ResponseSerializer responseSerializer = new ResponseSerializer();
interface Callbacks
{
void onRefreshFailure(InetAddressAndPort from, RequestFailureReason reason);
void onRefreshSuccess(Ballot isSupersededBy, InetAddressAndPort from);
}
private final Message<Request> send;
private final Callbacks callbacks;
public PaxosPrepareRefresh(Ballot prepared, Paxos.Participants participants, Committed latestCommitted, Callbacks callbacks)
{
this.callbacks = callbacks;
this.send = Message.out(PAXOS2_PREPARE_REFRESH_REQ, new Request(prepared, latestCommitted));
}
void refresh(List<InetAddressAndPort> refresh)
{
boolean executeOnSelf = false;
for (int i = 0, size = refresh.size(); i < size ; ++i)
{
InetAddressAndPort destination = refresh.get(i);
if (logger.isTraceEnabled())
logger.trace("Refresh {} and Confirm {} to {}", send.payload.missingCommit, Ballot.toString(send.payload.promised, "Promise"), destination);
if (Tracing.isTracing())
Tracing.trace("Refresh {} and Confirm {} to {}", send.payload.missingCommit.ballot, send.payload.promised, destination);
if (shouldExecuteOnSelf(destination))
executeOnSelf = true;
else
MessagingService.instance().sendWithCallback(send, destination, this);
}
if (executeOnSelf)
PAXOS2_PREPARE_REFRESH_REQ.stage.execute(this::executeOnSelf);
}
@Override
public void onFailure(InetAddressAndPort from, RequestFailureReason reason)
{
callbacks.onRefreshFailure(from, reason);
}
@Override
public void onResponse(Message<Response> message)
{
onResponse(message.payload, message.from());
}
private void executeOnSelf()
{
Response response;
try
{
response = RequestHandler.execute(send.payload, getBroadcastAddressAndPort());
if (response == null)
return;
}
catch (Exception ex)
{
RequestFailureReason reason = UNKNOWN;
if (ex instanceof WriteTimeoutException) reason = TIMEOUT;
else logger.error("Failed to apply paxos refresh-prepare locally", ex);
onFailure(getBroadcastAddressAndPort(), reason);
return;
}
onResponse(response, getBroadcastAddressAndPort());
}
private void onResponse(Response response, InetAddressAndPort from)
{
callbacks.onRefreshSuccess(response.isSupersededBy, from);
}
private static class Request
{
final Ballot promised;
final Committed missingCommit;
Request(Ballot promised, Committed missingCommit)
{
this.promised = promised;
this.missingCommit = missingCommit;
}
}
static class Response
{
final Ballot isSupersededBy;
Response(Ballot isSupersededBy)
{
this.isSupersededBy = isSupersededBy;
}
}
public static class RequestHandler implements IVerbHandler<Request>
{
@Override
public void doVerb(Message<Request> message)
{
Response response = execute(message.payload, message.from());
if (response == null)
MessagingService.instance().respondWithFailure(UNKNOWN, message);
else
MessagingService.instance().respond(response, message);
}
public static Response execute(Request request, InetAddressAndPort from)
{
Agreed commit = request.missingCommit;
if (!Paxos.isInRangeAndShouldProcess(from, commit.update.partitionKey(), commit.update.metadata(), false))
return null;
try (PaxosState state = PaxosState.get(commit))
{
state.commit(commit);
Ballot latest = state.current(request.promised).latestWitnessedOrLowBound();
if (isAfter(latest, request.promised))
{
Tracing.trace("Promise {} rescinded; latest is now {}", request.promised, latest);
return new Response(latest);
}
else
{
Tracing.trace("Promise confirmed for ballot {}", request.promised);
return new Response(null);
}
}
}
}
public static class RequestSerializer implements IVersionedSerializer<Request>
{
@Override
public void serialize(Request request, DataOutputPlus out, int version) throws IOException
{
request.promised.serialize(out);
Committed.serializer.serialize(request.missingCommit, out, version);
}
@Override
public Request deserialize(DataInputPlus in, int version) throws IOException
{
Ballot promise = Ballot.deserialize(in);
Committed missingCommit = Committed.serializer.deserialize(in, version);
return new Request(promise, missingCommit);
}
@Override
public long serializedSize(Request request, int version)
{
return Ballot.sizeInBytes()
+ Committed.serializer.serializedSize(request.missingCommit, version);
}
}
public static class ResponseSerializer implements IVersionedSerializer<Response>
{
public void serialize(Response response, DataOutputPlus out, int version) throws IOException
{
serializeNullable(Ballot.Serializer.instance, response.isSupersededBy, out, version);
}
public Response deserialize(DataInputPlus in, int version) throws IOException
{
Ballot isSupersededBy = deserializeNullable(Ballot.Serializer.instance, in, version);
return new Response(isSupersededBy);
}
public long serializedSize(Response response, int version)
{
return serializedSizeNullable(Ballot.Serializer.instance, response.isSupersededBy, version);
}
}
}