blob: 3d765eaada5d5ac7ca9df3d536f19f8eb0c1433f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos.cleanup;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_REQ;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
public class PaxosCleanupSession extends AsyncFuture<Void> implements Runnable,
IEndpointStateChangeSubscriber,
IFailureDetectionEventListener,
RequestCallbackWithFailure<Void>
{
private static final Map<UUID, PaxosCleanupSession> sessions = new ConcurrentHashMap<>();
static final long TIMEOUT_NANOS;
static
{
long timeoutSeconds = Integer.getInteger("cassandra.paxos_cleanup_session_timeout_seconds", (int) TimeUnit.HOURS.toSeconds(2));
TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(timeoutSeconds);
}
private static class TimeoutTask implements Runnable
{
private final WeakReference<PaxosCleanupSession> ref;
TimeoutTask(PaxosCleanupSession session)
{
this.ref = new WeakReference<>(session);
}
@Override
public void run()
{
PaxosCleanupSession session = ref.get();
if (session == null || session.isDone())
return;
long remaining = session.lastMessageSentNanos + TIMEOUT_NANOS - nanoTime();
if (remaining > 0)
schedule(remaining);
else
session.fail(String.format("Paxos cleanup session %s timed out", session.session));
}
ScheduledFuture<?> schedule(long delayNanos)
{
return ScheduledExecutors.scheduledTasks.scheduleTimeoutWithDelay(this, delayNanos, TimeUnit.NANOSECONDS);
}
private static ScheduledFuture<?> schedule(PaxosCleanupSession session)
{
return new TimeoutTask(session).schedule(TIMEOUT_NANOS);
}
}
private final UUID session = UUID.randomUUID();
private final TableId tableId;
private final Collection<Range<Token>> ranges;
private final Queue<InetAddressAndPort> pendingCleanups = new ConcurrentLinkedQueue<>();
private InetAddressAndPort inProgress = null;
private volatile long lastMessageSentNanos = nanoTime();
private ScheduledFuture<?> timeout;
PaxosCleanupSession(Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges)
{
this.tableId = tableId;
this.ranges = ranges;
pendingCleanups.addAll(endpoints);
}
private static void setSession(PaxosCleanupSession session)
{
Preconditions.checkState(!sessions.containsKey(session.session));
sessions.put(session.session, session);
}
private static void removeSession(PaxosCleanupSession session)
{
Preconditions.checkState(sessions.containsKey(session.session));
sessions.remove(session.session);
}
@Override
public void run()
{
setSession(this);
startNextOrFinish();
if (!isDone())
timeout = TimeoutTask.schedule(this);
}
private void startCleanup(InetAddressAndPort endpoint)
{
lastMessageSentNanos = nanoTime();
PaxosCleanupRequest completer = new PaxosCleanupRequest(session, tableId, ranges);
Message<PaxosCleanupRequest> msg = Message.out(PAXOS2_CLEANUP_REQ, completer);
MessagingService.instance().sendWithCallback(msg, endpoint, this);
}
private synchronized void startNextOrFinish()
{
InetAddressAndPort endpoint = pendingCleanups.poll();
if (endpoint == null)
Preconditions.checkState(inProgress == null, "Unable to complete paxos cleanup session %s, still waiting on %s", session, inProgress);
else
Preconditions.checkState(inProgress == null, "Unable to start paxos cleanup on %s for %s, still waiting on response from %s", endpoint, session, inProgress);
inProgress = endpoint;
if (endpoint != null)
{
startCleanup(endpoint);
}
else
{
removeSession(this);
trySuccess(null);
if (timeout != null)
timeout.cancel(true);
}
}
private synchronized void fail(String message)
{
if (isDone())
return;
removeSession(this);
tryFailure(new PaxosCleanupException(message));
if (timeout != null)
timeout.cancel(true);
}
private synchronized void finish(InetAddressAndPort from, PaxosCleanupResponse finished)
{
Preconditions.checkArgument(from.equals(inProgress), "Received unexpected cleanup complete response from %s for session %s. Expected %s", from, session, inProgress);
inProgress = null;
if (finished.wasSuccessful)
{
startNextOrFinish();
}
else
{
fail(String.format("Paxos cleanup session %s failed on %s with message: %s", session, from, finished.message));
}
}
public static void finishSession(InetAddressAndPort from, PaxosCleanupResponse response)
{
PaxosCleanupSession session = sessions.get(response.session);
if (session != null)
session.finish(from, response);
}
private synchronized void maybeKillSession(InetAddressAndPort unavailable, String reason)
{
// don't fail if we've already completed the cleanup for the unavailable endpoint,
// if it's something that affects availability, the ongoing sessions will fail themselves
if (!pendingCleanups.contains(unavailable))
return;
fail(String.format("Paxos cleanup session %s failed after %s %s", session, unavailable, reason));
}
@Override
public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
{
}
@Override
public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
{
}
@Override
public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
{
}
@Override
public void onAlive(InetAddressAndPort endpoint, EndpointState state)
{
}
@Override
public void onDead(InetAddressAndPort endpoint, EndpointState state)
{
maybeKillSession(endpoint, "marked dead");
}
@Override
public void onRemove(InetAddressAndPort endpoint)
{
maybeKillSession(endpoint, "removed from ring");
}
@Override
public void onRestart(InetAddressAndPort endpoint, EndpointState state)
{
maybeKillSession(endpoint, "restarted");
}
@Override
public void convict(InetAddressAndPort ep, double phi)
{
maybeKillSession(ep, "convicted by failure detector");
}
@Override
public void onFailure(InetAddressAndPort from, RequestFailureReason reason)
{
fail(from.toString() + ' ' + reason + " for cleanup request for paxos cleanup session " + session);
}
@Override
public void onResponse(Message<Void> msg)
{
// noop, we're only interested in failures
}
}