blob: 86ecfb76f0e753b6b95665f6f8b8263ecde68221 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair.consistent;
import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
import org.apache.cassandra.repair.messages.PrepareMessage;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.messages.StatusRequest;
import org.apache.cassandra.repair.messages.StatusResponse;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.tools.nodetool.RepairAdmin;
import org.apache.cassandra.utils.TimeUUID;
/**
* Base class for consistent Local and Coordinator sessions
*
* <p/>
* There are 4 stages to a consistent incremental repair.
*
* <h1>Repair prepare</h1>
* First, the normal {@link ActiveRepairService#prepareForRepair(TimeUUID, InetAddressAndPort, Set, RepairOption, boolean, List)} stuff
* happens, which sends out {@link PrepareMessage} and creates a {@link ActiveRepairService.ParentRepairSession}
* on the coordinator and each of the neighbors.
*
* <h1>Consistent prepare</h1>
* The consistent prepare step promotes the parent repair session to a consistent session, and isolates the sstables
* being repaired from other sstables. First, the coordinator sends a {@link PrepareConsistentRequest} message to each repair
* participant (including itself). When received, the node creates a {@link LocalSession} instance, sets it's state to
* {@code PREPARING}, persists it, and begins a preparing the tables for incremental repair, which segregates the data
* being repaired from the rest of the table data. When the preparation completes, the session state is set to
* {@code PREPARED}, and a {@link PrepareConsistentResponse} is sent to the coordinator indicating success or failure.
* If the pending anti-compaction fails, the local session state is set to {@code FAILED}.
* <p/>
* (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort, PrepareConsistentRequest)}
* <p/>
* Once the coordinator recieves positive {@code PrepareConsistentResponse} messages from all the participants, the
* coordinator begins the normal repair process.
* <p/>
* (see {@link CoordinatorSession#handlePrepareResponse(InetAddressAndPort, boolean)}
*
* <h1>Repair</h1>
* The coordinator runs the normal data repair process against the sstables segregated in the previous step. When a
* node recieves a {@link ValidationRequest}, it sets it's local session state to {@code REPAIRING}.
* <p/>
*
* If all of the RepairSessions complete successfully, the coordinator begins the {@code Finalization} process. Otherwise,
* it begins the {@code Failure} process.
*
* <h1>Finalization</h1>
* The finalization step finishes the session and promotes the sstables to repaired. The coordinator begins by sending
* {@link FinalizePropose} messages to each of the participants. Each participant will set it's state to {@code FINALIZE_PROMISED}
* and respond with a {@link FinalizePromise} message. Once the coordinator has received promise messages from all participants,
* it will send a {@link FinalizeCommit} message to all of them, ending the coordinator session. When a node receives the
* {@code FinalizeCommit} message, it will set it's sessions state to {@code FINALIZED}, completing the {@code LocalSession}.
* <p/>
*
* For the sake of simplicity, finalization does not immediately mark pending repair sstables repaired because of potential
* conflicts with in progress compactions. The sstables will be marked repaired as part of the normal compaction process.
* <p/>
*
* On the coordinator side, see {@link CoordinatorSession#finalizePropose()}, {@link CoordinatorSession#handleFinalizePromise(InetAddressAndPort, boolean)},
* & {@link CoordinatorSession#finalizeCommit()}
* <p/>
*
* On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddressAndPort, FinalizePropose)}
* & {@link LocalSessions#handleFinalizeCommitMessage(InetAddressAndPort, FinalizeCommit)}
*
* <h1>Failure</h1>
* If there are any failures or problems during the process above, the session will be failed. When a session is failed,
* the coordinator will send {@link FailSession} messages to each of the participants. In some cases (basically those not
* including Validation and Sync) errors are reported back to the coordinator by the local session, at which point, it
* will send {@code FailSession} messages out.
* <p/>
* Just as with finalization, sstables aren't immediately moved back to unrepaired, but will be demoted as part of the
* normal compaction process.
*
* <p/>
* See {@link LocalSessions#failSession(UUID, boolean)} and {@link CoordinatorSession#fail()}
*
* <h1>Failure Recovery & Session Cleanup</h1>
* There are a few scenarios where sessions can get stuck. If a node fails mid session, or it misses a {@code FailSession}
* or {@code FinalizeCommit} message, it will never finish. To address this, there is a cleanup task that runs every
* 10 minutes that attempts to complete idle sessions.
*
* <p/>
* If a session is not completed (not {@code FINALIZED} or {@code FAILED}) and there's been no activity on the session for
* over an hour, the cleanup task will attempt to finish the session by learning the session state of the other participants.
* To do this, it sends a {@link StatusRequest} message to the other session participants. The participants respond with a
* {@link StatusResponse} message, notifying the sender of their state. If the sender receives a {@code FAILED} response
* from any of the participants, it fails the session locally. If it receives a {@code FINALIZED} response from any of the
* participants, it will set it's state to {@code FINALIZED} as well. Since the coordinator won't finalize sessions until
* it's received {@code FinalizePromise} messages from <i>all</i> participants, this is safe.
*
*
* <p/>
* If a session is not completed, and hasn't had any activity for over a day, the session is auto-failed.
*
* <p/>
* Once a session has been completed for over 2 days, it's deleted.
*
* <p/>
* Operators can also manually fail sessions with {@code nodetool repair_admin --cancel}
*
* <p/>
* See {@link LocalSessions#cleanup()} and {@link RepairAdmin}
*
*/
public abstract class ConsistentSession
{
/**
* The possible states of a {@code ConsistentSession}. The typical progression is {@link State#PREPARING}, {@link State#PREPARED},
* {@link State#REPAIRING}, {@link State#FINALIZE_PROMISED}, and {@link State#FINALIZED}. With the exception of {@code FINALIZED},
* any state can be transitions to {@link State#FAILED}.
*/
public enum State
{
PREPARING(0),
PREPARED(1),
REPAIRING(2),
FINALIZE_PROMISED(3),
FINALIZED(4),
FAILED(5);
State(int expectedOrdinal)
{
assert ordinal() == expectedOrdinal;
}
private static final Map<State, Set<State>> transitions = new EnumMap<State, Set<State>>(State.class) {{
put(PREPARING, ImmutableSet.of(PREPARED, FAILED));
put(PREPARED, ImmutableSet.of(REPAIRING, FAILED));
put(REPAIRING, ImmutableSet.of(FINALIZE_PROMISED, FAILED));
put(FINALIZE_PROMISED, ImmutableSet.of(FINALIZED, FAILED));
put(FINALIZED, ImmutableSet.of());
put(FAILED, ImmutableSet.of());
}};
public boolean canTransitionTo(State state)
{
// redundant transitions are allowed because the failure recovery mechanism can
// send redundant status changes out, and they shouldn't throw exceptions
return state == this || transitions.get(this).contains(state);
}
public static State valueOf(int ordinal)
{
return values()[ordinal];
}
}
private volatile State state;
public final TimeUUID sessionID;
public final InetAddressAndPort coordinator;
public final ImmutableSet<TableId> tableIds;
public final long repairedAt;
public final ImmutableSet<Range<Token>> ranges;
public final ImmutableSet<InetAddressAndPort> participants;
ConsistentSession(AbstractBuilder builder)
{
builder.validate();
this.state = builder.state;
this.sessionID = builder.sessionID;
this.coordinator = builder.coordinator;
this.tableIds = ImmutableSet.copyOf(builder.ids);
this.repairedAt = builder.repairedAt;
this.ranges = ImmutableSet.copyOf(builder.ranges);
this.participants = ImmutableSet.copyOf(builder.participants);
}
public State getState()
{
return state;
}
public void setState(State state)
{
this.state = state;
}
public boolean intersects(Iterable<Range<Token>> otherRanges)
{
return Iterables.any(ranges, r -> r.intersects(otherRanges));
}
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConsistentSession that = (ConsistentSession) o;
if (repairedAt != that.repairedAt) return false;
if (state != that.state) return false;
if (!sessionID.equals(that.sessionID)) return false;
if (!coordinator.equals(that.coordinator)) return false;
if (!tableIds.equals(that.tableIds)) return false;
if (!ranges.equals(that.ranges)) return false;
return participants.equals(that.participants);
}
public int hashCode()
{
int result = state.hashCode();
result = 31 * result + sessionID.hashCode();
result = 31 * result + coordinator.hashCode();
result = 31 * result + tableIds.hashCode();
result = 31 * result + (int) (repairedAt ^ (repairedAt >>> 32));
result = 31 * result + ranges.hashCode();
result = 31 * result + participants.hashCode();
return result;
}
public String toString()
{
return "ConsistentSession{" +
"state=" + state +
", sessionID=" + sessionID +
", coordinator=" + coordinator +
", tableIds=" + tableIds +
", repairedAt=" + repairedAt +
", ranges=" + ranges +
", participants=" + participants +
'}';
}
abstract static class AbstractBuilder
{
private State state;
private TimeUUID sessionID;
private InetAddressAndPort coordinator;
private Set<TableId> ids;
private long repairedAt;
private Collection<Range<Token>> ranges;
private Set<InetAddressAndPort> participants;
void withState(State state)
{
this.state = state;
}
void withSessionID(TimeUUID sessionID)
{
this.sessionID = sessionID;
}
void withCoordinator(InetAddressAndPort coordinator)
{
this.coordinator = coordinator;
}
void withUUIDTableIds(Iterable<UUID> ids)
{
this.ids = ImmutableSet.copyOf(Iterables.transform(ids, TableId::fromUUID));
}
void withTableIds(Set<TableId> ids)
{
this.ids = ids;
}
void withRepairedAt(long repairedAt)
{
this.repairedAt = repairedAt;
}
void withRanges(Collection<Range<Token>> ranges)
{
this.ranges = ranges;
}
void withParticipants(Set<InetAddressAndPort> peers)
{
this.participants = peers;
}
void validate()
{
Preconditions.checkArgument(state != null);
Preconditions.checkArgument(sessionID != null);
Preconditions.checkArgument(coordinator != null);
Preconditions.checkArgument(ids != null);
Preconditions.checkArgument(!ids.isEmpty());
Preconditions.checkArgument(repairedAt > 0
|| repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE);
Preconditions.checkArgument(ranges != null);
Preconditions.checkArgument(!ranges.isEmpty());
Preconditions.checkArgument(participants != null);
Preconditions.checkArgument(!participants.isEmpty());
Preconditions.checkArgument(participants.contains(coordinator));
}
}
}