blob: 6f7b93e930d5cb1fe0742ae96c0c8630e47210a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair.consistent;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.repair.KeyspaceRepairManager;
import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.repair.consistent.admin.PendingStat;
import org.apache.cassandra.repair.consistent.admin.PendingStats;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.StatusRequest;
import org.apache.cassandra.repair.messages.StatusResponse;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.net.Verb.FAILED_SESSION_MSG;
import static org.apache.cassandra.net.Verb.FINALIZE_PROMISE_MSG;
import static org.apache.cassandra.net.Verb.PREPARE_CONSISTENT_RSP;
import static org.apache.cassandra.net.Verb.STATUS_REQ;
import static org.apache.cassandra.net.Verb.STATUS_RSP;
import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
/**
* Manages all consistent repair sessions a node is participating in.
* <p/>
* Since sessions need to be loaded, and since we need to handle cases where sessions might not exist, most of the logic
* around local sessions is implemented in this class, with the LocalSession class being treated more like a simple struct,
* in contrast with {@link CoordinatorSession}
*/
public class LocalSessions
{
private static final Logger logger = LoggerFactory.getLogger(LocalSessions.class);
private static final Set<Listener> listeners = new CopyOnWriteArraySet<>();
/**
* Amount of time a session can go without any activity before we start checking the status of other
* participants to see if we've missed a message
*/
static final int CHECK_STATUS_TIMEOUT = Integer.getInteger("cassandra.repair_status_check_timeout_seconds",
Ints.checkedCast(TimeUnit.HOURS.toSeconds(1)));
/**
* Amount of time a session can go without any activity before being automatically set to FAILED
*/
static final int AUTO_FAIL_TIMEOUT = Integer.getInteger("cassandra.repair_fail_timeout_seconds",
Ints.checkedCast(TimeUnit.DAYS.toSeconds(1)));
/**
* Amount of time a completed session is kept around after completion before being deleted, this gives
* compaction plenty of time to move sstables from successful sessions into the repaired bucket
*/
static final int AUTO_DELETE_TIMEOUT = Integer.getInteger("cassandra.repair_delete_timeout_seconds",
Ints.checkedCast(TimeUnit.DAYS.toSeconds(1)));
/**
* How often LocalSessions.cleanup is run
*/
public static final int CLEANUP_INTERVAL = Integer.getInteger("cassandra.repair_cleanup_interval_seconds",
Ints.checkedCast(TimeUnit.MINUTES.toSeconds(10)));
private static Set<TableId> uuidToTableId(Set<UUID> src)
{
return ImmutableSet.copyOf(Iterables.transform(src, TableId::fromUUID));
}
private static Set<UUID> tableIdToUuid(Set<TableId> src)
{
return ImmutableSet.copyOf(Iterables.transform(src, TableId::asUUID));
}
private final String keyspace = SchemaConstants.SYSTEM_KEYSPACE_NAME;
private final String table = SystemKeyspace.REPAIRS;
private boolean started = false;
private volatile ImmutableMap<UUID, LocalSession> sessions = ImmutableMap.of();
private volatile ImmutableMap<TableId, RepairedState> repairedStates = ImmutableMap.of();
@VisibleForTesting
int getNumSessions()
{
return sessions.size();
}
@VisibleForTesting
protected InetAddressAndPort getBroadcastAddressAndPort()
{
return FBUtilities.getBroadcastAddressAndPort();
}
@VisibleForTesting
protected boolean isAlive(InetAddressAndPort address)
{
return FailureDetector.instance.isAlive(address);
}
@VisibleForTesting
protected boolean isNodeInitialized()
{
return StorageService.instance.isInitialized();
}
public List<Map<String, String>> sessionInfo(boolean all, Set<Range<Token>> ranges)
{
Iterable<LocalSession> currentSessions = sessions.values();
if (!all)
currentSessions = Iterables.filter(currentSessions, s -> !s.isCompleted());
if (!ranges.isEmpty())
currentSessions = Iterables.filter(currentSessions, s -> s.intersects(ranges));
return Lists.newArrayList(Iterables.transform(currentSessions, LocalSessionInfo::sessionToMap));
}
private RepairedState getRepairedState(TableId tid)
{
if (!repairedStates.containsKey(tid))
{
synchronized (this)
{
if (!repairedStates.containsKey(tid))
{
repairedStates = ImmutableMap.<TableId, RepairedState>builder()
.putAll(repairedStates)
.put(tid, new RepairedState())
.build();
}
}
}
return Verify.verifyNotNull(repairedStates.get(tid));
}
private void maybeUpdateRepairedState(LocalSession session)
{
if (!shouldStoreSession(session))
return;
for (TableId tid : session.tableIds)
{
RepairedState state = getRepairedState(tid);
state.add(session.ranges, session.repairedAt);
}
}
private boolean shouldStoreSession(LocalSession session)
{
if (session.getState() != FINALIZED)
return false;
// if the session is finalized but has repairedAt set to 0, it was
// a forced repair, and we shouldn't update the repaired state
return session.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
}
/**
* Determine if all ranges and tables covered by this session
* have since been re-repaired by a more recent session
*/
private boolean isSuperseded(LocalSession session)
{
for (TableId tid : session.tableIds)
{
RepairedState state = repairedStates.get(tid);
if (state == null)
return false;
long minRepaired = state.minRepairedAt(session.ranges);
if (minRepaired <= session.repairedAt)
return false;
}
return true;
}
public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges)
{
RepairedState state = repairedStates.get(tid);
if (state == null)
return RepairedState.Stats.EMPTY;
return state.getRepairedStats(ranges);
}
public PendingStats getPendingStats(TableId tid, Collection<Range<Token>> ranges)
{
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
Preconditions.checkArgument(cfs != null);
PendingStat.Builder pending = new PendingStat.Builder();
PendingStat.Builder finalized = new PendingStat.Builder();
PendingStat.Builder failed = new PendingStat.Builder();
Map<UUID, PendingStat> stats = cfs.getPendingRepairStats();
for (Map.Entry<UUID, PendingStat> entry : stats.entrySet())
{
UUID sessionID = entry.getKey();
PendingStat stat = entry.getValue();
Verify.verify(sessionID.equals(Iterables.getOnlyElement(stat.sessions)));
LocalSession session = sessions.get(sessionID);
Verify.verifyNotNull(session);
if (!Iterables.any(ranges, r -> r.intersects(session.ranges)))
continue;
switch (session.getState())
{
case FINALIZED:
finalized.addStat(stat);
break;
case FAILED:
failed.addStat(stat);
break;
default:
pending.addStat(stat);
}
}
return new PendingStats(cfs.keyspace.getName(), cfs.name, pending.build(), finalized.build(), failed.build());
}
public CleanupSummary cleanup(TableId tid, Collection<Range<Token>> ranges, boolean force)
{
Iterable<LocalSession> candidates = Iterables.filter(sessions.values(),
ls -> ls.isCompleted()
&& ls.tableIds.contains(tid)
&& Range.intersects(ls.ranges, ranges));
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
Set<UUID> sessionIds = Sets.newHashSet(Iterables.transform(candidates, s -> s.sessionID));
return cfs.releaseRepairData(sessionIds, force);
}
/**
* hook for operators to cancel sessions, cancelling from a non-coordinator is an error, unless
* force is set to true. Messages are sent out to other participants, but we don't wait for a response
*/
public void cancelSession(UUID sessionID, boolean force)
{
logger.info("Cancelling local repair session {}", sessionID);
LocalSession session = getSession(sessionID);
Preconditions.checkArgument(session != null, "Session {} does not exist", sessionID);
Preconditions.checkArgument(force || session.coordinator.equals(getBroadcastAddressAndPort()),
"Cancel session %s from it's coordinator (%s) or use --force",
sessionID, session.coordinator);
setStateAndSave(session, FAILED);
Message<FailSession> message = Message.out(FAILED_SESSION_MSG, new FailSession(sessionID));
for (InetAddressAndPort participant : session.participants)
{
if (!participant.equals(getBroadcastAddressAndPort()))
sendMessage(participant, message);
}
}
/**
* Loads sessions out of the repairs table and sets state to started
*/
public synchronized void start()
{
Preconditions.checkArgument(!started, "LocalSessions.start can only be called once");
Preconditions.checkArgument(sessions.isEmpty(), "No sessions should be added before start");
UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(String.format("SELECT * FROM %s.%s", keyspace, table), 1000);
Map<UUID, LocalSession> loadedSessions = new HashMap<>();
Map<TableId, List<RepairedState.Level>> initialLevels = new HashMap<>();
for (UntypedResultSet.Row row : rows)
{
try
{
LocalSession session = load(row);
loadedSessions.put(session.sessionID, session);
if (shouldStoreSession(session))
{
for (TableId tid : session.tableIds)
initialLevels.computeIfAbsent(tid, (t) -> new ArrayList<>())
.add(new RepairedState.Level(session.ranges, session.repairedAt));
}
}
catch (IllegalArgumentException | NullPointerException e)
{
logger.warn("Unable to load malformed repair session {}, removing", row.has("parent_id") ? row.getUUID("parent_id") : null);
if (row.has("parent_id"))
deleteRow(row.getUUID("parent_id"));
}
}
for (Map.Entry<TableId, List<RepairedState.Level>> entry : initialLevels.entrySet())
getRepairedState(entry.getKey()).addAll(entry.getValue());
sessions = ImmutableMap.copyOf(loadedSessions);
failOngoingRepairs();
started = true;
}
public synchronized void stop()
{
if (!started)
return;
started = false;
failOngoingRepairs();
}
private void failOngoingRepairs()
{
for (LocalSession session : sessions.values())
{
synchronized (session)
{
switch (session.getState())
{
case FAILED:
case FINALIZED:
case FINALIZE_PROMISED:
continue;
default:
logger.info("Found repair session {} with state = {} - failing the repair", session.sessionID, session.getState());
failSession(session, true);
}
}
}
}
public boolean isStarted()
{
return started;
}
private static boolean shouldCheckStatus(LocalSession session, int now)
{
return !session.isCompleted() && (now > session.getLastUpdate() + CHECK_STATUS_TIMEOUT);
}
private static boolean shouldFail(LocalSession session, int now)
{
return !session.isCompleted() && (now > session.getLastUpdate() + AUTO_FAIL_TIMEOUT);
}
private static boolean shouldDelete(LocalSession session, int now)
{
return session.isCompleted() && (now > session.getLastUpdate() + AUTO_DELETE_TIMEOUT);
}
/**
* Auto fails and auto deletes timed out and old sessions
* Compaction will clean up the sstables still owned by a deleted session
*/
public void cleanup()
{
logger.trace("Running LocalSessions.cleanup");
if (!isNodeInitialized())
{
logger.trace("node not initialized, aborting local session cleanup");
return;
}
Set<LocalSession> currentSessions = new HashSet<>(sessions.values());
for (LocalSession session : currentSessions)
{
synchronized (session)
{
int now = FBUtilities.nowInSeconds();
if (shouldFail(session, now))
{
logger.warn("Auto failing timed out repair session {}", session);
failSession(session.sessionID, false);
}
else if (shouldDelete(session, now))
{
if (session.getState() == FINALIZED && !isSuperseded(session))
{
// if we delete a non-superseded session, some ranges will be mis-reported as
// not having been repaired in repair_admin after a restart
logger.info("Skipping delete of FINALIZED LocalSession {} because it has " +
"not been superseded by a more recent session", session.sessionID);
}
else if (!sessionHasData(session))
{
logger.info("Auto deleting repair session {}", session);
deleteSession(session.sessionID);
}
else
{
logger.warn("Skipping delete of LocalSession {} because it still contains sstables", session.sessionID);
}
}
else if (shouldCheckStatus(session, now))
{
sendStatusRequest(session);
}
}
}
}
private static ByteBuffer serializeRange(Range<Token> range)
{
int size = (int) Token.serializer.serializedSize(range.left, 0);
size += (int) Token.serializer.serializedSize(range.right, 0);
try (DataOutputBuffer buffer = new DataOutputBuffer(size))
{
Token.serializer.serialize(range.left, buffer, 0);
Token.serializer.serialize(range.right, buffer, 0);
return buffer.buffer();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
private static Set<ByteBuffer> serializeRanges(Set<Range<Token>> ranges)
{
Set<ByteBuffer> buffers = new HashSet<>(ranges.size());
ranges.forEach(r -> buffers.add(serializeRange(r)));
return buffers;
}
private static Range<Token> deserializeRange(ByteBuffer bb)
{
try (DataInputBuffer in = new DataInputBuffer(bb, false))
{
IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
Token left = Token.serializer.deserialize(in, partitioner, 0);
Token right = Token.serializer.deserialize(in, partitioner, 0);
return new Range<>(left, right);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
private static Set<Range<Token>> deserializeRanges(Set<ByteBuffer> buffers)
{
Set<Range<Token>> ranges = new HashSet<>(buffers.size());
buffers.forEach(bb -> ranges.add(deserializeRange(bb)));
return ranges;
}
/**
* Save session state to table
*/
@VisibleForTesting
void save(LocalSession session)
{
String query = "INSERT INTO %s.%s " +
"(parent_id, " +
"started_at, " +
"last_update, " +
"repaired_at, " +
"state, " +
"coordinator, " +
"coordinator_port, " +
"participants, " +
"participants_wp," +
"ranges, " +
"cfids) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
QueryProcessor.executeInternal(String.format(query, keyspace, table),
session.sessionID,
Date.from(Instant.ofEpochSecond(session.startedAt)),
Date.from(Instant.ofEpochSecond(session.getLastUpdate())),
Date.from(Instant.ofEpochMilli(session.repairedAt)),
session.getState().ordinal(),
session.coordinator.address,
session.coordinator.port,
session.participants.stream().map(participant -> participant.address).collect(Collectors.toSet()),
session.participants.stream().map(participant -> participant.getHostAddressAndPort()).collect(Collectors.toSet()),
serializeRanges(session.ranges),
tableIdToUuid(session.tableIds));
maybeUpdateRepairedState(session);
}
private static int dateToSeconds(Date d)
{
return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(d.getTime()));
}
private LocalSession load(UntypedResultSet.Row row)
{
LocalSession.Builder builder = LocalSession.builder();
builder.withState(ConsistentSession.State.valueOf(row.getInt("state")));
builder.withSessionID(row.getUUID("parent_id"));
InetAddressAndPort coordinator = InetAddressAndPort.getByAddressOverrideDefaults(
row.getInetAddress("coordinator"),
row.getInt("coordinator_port"));
builder.withCoordinator(coordinator);
builder.withTableIds(uuidToTableId(row.getSet("cfids", UUIDType.instance)));
builder.withRepairedAt(row.getTimestamp("repaired_at").getTime());
builder.withRanges(deserializeRanges(row.getSet("ranges", BytesType.instance)));
//There is no cross version streaming and thus no cross version repair so assume that
//any valid repair sessions has the participants_wp column and any that doesn't is malformed
Set<String> participants = row.getSet("participants_wp", UTF8Type.instance);
builder.withParticipants(participants.stream().map(participant ->
{
try
{
return InetAddressAndPort.getByName(participant);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}).collect(Collectors.toSet()));
builder.withStartedAt(dateToSeconds(row.getTimestamp("started_at")));
builder.withLastUpdate(dateToSeconds(row.getTimestamp("last_update")));
return buildSession(builder);
}
private void deleteRow(UUID sessionID)
{
String query = "DELETE FROM %s.%s WHERE parent_id=?";
QueryProcessor.executeInternal(String.format(query, keyspace, table), sessionID);
}
private void syncTable()
{
TableId tid = Schema.instance.getTableMetadata(keyspace, table).id;
ColumnFamilyStore cfm = Schema.instance.getColumnFamilyStoreInstance(tid);
cfm.forceBlockingFlush();
}
/**
* Loads a session directly from the table. Should be used for testing only
*/
@VisibleForTesting
LocalSession loadUnsafe(UUID sessionId)
{
String query = "SELECT * FROM %s.%s WHERE parent_id=?";
UntypedResultSet result = QueryProcessor.executeInternal(String.format(query, keyspace, table), sessionId);
if (result.isEmpty())
return null;
UntypedResultSet.Row row = result.one();
return load(row);
}
@VisibleForTesting
protected LocalSession buildSession(LocalSession.Builder builder)
{
return new LocalSession(builder);
}
public LocalSession getSession(UUID sessionID)
{
return sessions.get(sessionID);
}
@VisibleForTesting
synchronized void putSessionUnsafe(LocalSession session)
{
putSession(session);
save(session);
}
private synchronized void putSession(LocalSession session)
{
Preconditions.checkArgument(!sessions.containsKey(session.sessionID),
"LocalSession %s already exists", session.sessionID);
Preconditions.checkArgument(started, "sessions cannot be added before LocalSessions is started");
sessions = ImmutableMap.<UUID, LocalSession>builder()
.putAll(sessions)
.put(session.sessionID, session)
.build();
}
private synchronized void removeSession(UUID sessionID)
{
Preconditions.checkArgument(sessionID != null);
Map<UUID, LocalSession> temp = new HashMap<>(sessions);
temp.remove(sessionID);
sessions = ImmutableMap.copyOf(temp);
}
@VisibleForTesting
LocalSession createSessionUnsafe(UUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddressAndPort> peers)
{
LocalSession.Builder builder = LocalSession.builder();
builder.withState(ConsistentSession.State.PREPARING);
builder.withSessionID(sessionId);
builder.withCoordinator(prs.coordinator);
builder.withTableIds(prs.getTableIds());
builder.withRepairedAt(prs.repairedAt);
builder.withRanges(prs.getRanges());
builder.withParticipants(peers);
int now = FBUtilities.nowInSeconds();
builder.withStartedAt(now);
builder.withLastUpdate(now);
return buildSession(builder);
}
protected ActiveRepairService.ParentRepairSession getParentRepairSession(UUID sessionID)
{
return ActiveRepairService.instance.getParentRepairSession(sessionID);
}
protected void sendMessage(InetAddressAndPort destination, Message<? extends RepairMessage> message)
{
logger.trace("sending {} to {}", message.payload, destination);
MessagingService.instance().send(message, destination);
}
@VisibleForTesting
void setStateAndSave(LocalSession session, ConsistentSession.State state)
{
synchronized (session)
{
Preconditions.checkArgument(session.getState().canTransitionTo(state),
"Invalid state transition %s -> %s",
session.getState(), state);
logger.trace("Changing LocalSession state from {} -> {} for {}", session.getState(), state, session.sessionID);
boolean wasCompleted = session.isCompleted();
session.setState(state);
session.setLastUpdate();
save(session);
if (session.isCompleted() && !wasCompleted)
{
sessionCompleted(session);
}
for (Listener listener : listeners)
listener.onIRStateChange(session);
}
}
public void failSession(UUID sessionID)
{
failSession(sessionID, true);
}
public void failSession(UUID sessionID, boolean sendMessage)
{
failSession(getSession(sessionID), sendMessage);
}
public void failSession(LocalSession session, boolean sendMessage)
{
if (session != null)
{
synchronized (session)
{
if (session.getState() != FAILED)
{
logger.info("Failing local repair session {}", session.sessionID);
setStateAndSave(session, FAILED);
}
}
if (sendMessage)
{
sendMessage(session.coordinator, Message.out(FAILED_SESSION_MSG, new FailSession(session.sessionID)));
}
}
}
public synchronized void deleteSession(UUID sessionID)
{
logger.info("Deleting local repair session {}", sessionID);
LocalSession session = getSession(sessionID);
Preconditions.checkArgument(session.isCompleted(), "Cannot delete incomplete sessions");
deleteRow(sessionID);
removeSession(sessionID);
}
@VisibleForTesting
ListenableFuture prepareSession(KeyspaceRepairManager repairManager,
UUID sessionID,
Collection<ColumnFamilyStore> tables,
RangesAtEndpoint tokenRanges,
ExecutorService executor,
BooleanSupplier isCancelled)
{
return repairManager.prepareIncrementalRepair(sessionID, tables, tokenRanges, executor, isCancelled);
}
RangesAtEndpoint filterLocalRanges(String keyspace, Set<Range<Token>> ranges)
{
RangesAtEndpoint localRanges = StorageService.instance.getLocalReplicas(keyspace);
RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(localRanges.endpoint());
for (Range<Token> range : ranges)
{
for (Replica replica : localRanges)
{
if (replica.range().equals(range))
{
builder.add(replica);
}
else if (replica.contains(range))
{
builder.add(replica.decorateSubrange(range));
}
}
}
return builder.build();
}
/**
* The PrepareConsistentRequest promotes the parent repair session to a consistent incremental
* session, and isolates the data to be repaired from the rest of the table's data
*
* No response is sent to the repair coordinator until the data preparation / isolation has completed
* successfully. If the data preparation fails, a failure message is sent to the coordinator,
* cancelling the session.
*/
public void handlePrepareMessage(InetAddressAndPort from, PrepareConsistentRequest request)
{
logger.trace("received {} from {}", request, from);
UUID sessionID = request.parentSession;
InetAddressAndPort coordinator = request.coordinator;
Set<InetAddressAndPort> peers = request.participants;
ActiveRepairService.ParentRepairSession parentSession;
try
{
parentSession = getParentRepairSession(sessionID);
}
catch (Throwable e)
{
logger.error("Error retrieving ParentRepairSession for session {}, responding with failure", sessionID);
sendMessage(coordinator, Message.out(PREPARE_CONSISTENT_RSP, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
return;
}
LocalSession session = createSessionUnsafe(sessionID, parentSession, peers);
putSessionUnsafe(session);
logger.info("Beginning local incremental repair session {}", session);
ExecutorService executor = Executors.newFixedThreadPool(parentSession.getColumnFamilyStores().size());
KeyspaceRepairManager repairManager = parentSession.getKeyspace().getRepairManager();
RangesAtEndpoint tokenRanges = filterLocalRanges(parentSession.getKeyspace().getName(), parentSession.getRanges());
ListenableFuture repairPreparation = prepareSession(repairManager, sessionID, parentSession.getColumnFamilyStores(),
tokenRanges, executor, () -> session.getState() != PREPARING);
Futures.addCallback(repairPreparation, new FutureCallback<Object>()
{
public void onSuccess(@Nullable Object result)
{
try
{
logger.info("Prepare phase for incremental repair session {} completed", sessionID);
if (!prepareSessionExceptFailed(session))
logger.info("Session {} failed before anticompaction completed", sessionID);
Message<PrepareConsistentResponse> message =
Message.out(PREPARE_CONSISTENT_RSP,
new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), session.getState() != FAILED));
sendMessage(coordinator, message);
}
finally
{
executor.shutdown();
}
}
public void onFailure(Throwable t)
{
try
{
logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
sendMessage(coordinator,
Message.out(PREPARE_CONSISTENT_RSP,
new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
failSession(sessionID, false);
}
finally
{
executor.shutdown();
}
}
}, MoreExecutors.directExecutor());
}
/**
* Checks for the session state, and sets it to prepared unless it is on a failed state.
* Making the checks inside a synchronized block to prevent the session state from
* being changed between the read and the update.
*
* @param session The local session to be set to prepared.
* @return true if the session is prepared, false if not, i.e. session failed
*/
private boolean prepareSessionExceptFailed(LocalSession session) {
synchronized (session)
{
if (session.getState() == FAILED)
return false;
setStateAndSave(session, PREPARED);
return true;
}
}
public void maybeSetRepairing(UUID sessionID)
{
LocalSession session = getSession(sessionID);
if (session != null && session.getState() != REPAIRING)
{
logger.info("Setting local incremental repair session {} to REPAIRING", session);
setStateAndSave(session, REPAIRING);
}
}
public void handleFinalizeProposeMessage(InetAddressAndPort from, FinalizePropose propose)
{
logger.trace("received {} from {}", propose, from);
UUID sessionID = propose.sessionID;
LocalSession session = getSession(sessionID);
if (session == null)
{
logger.info("Received FinalizePropose message for unknown repair session {}, responding with failure", sessionID);
sendMessage(from, Message.out(FAILED_SESSION_MSG, new FailSession(sessionID)));
return;
}
try
{
setStateAndSave(session, FINALIZE_PROMISED);
/*
Flushing the repairs table here, *before* responding to the coordinator prevents a scenario where we respond
with a promise to the coordinator, but there is a failure before the commit log mutation with the
FINALIZE_PROMISED status is synced to disk. This could cause the state for this session to revert to an
earlier status on startup, which would prevent the failure recovery mechanism from ever being able to promote
this session to FINALIZED, likely creating inconsistencies in the repaired data sets across nodes.
*/
syncTable();
sendMessage(from, Message.out(FINALIZE_PROMISE_MSG, new FinalizePromise(sessionID, getBroadcastAddressAndPort(), true)));
logger.info("Received FinalizePropose message for incremental repair session {}, responded with FinalizePromise", sessionID);
}
catch (IllegalArgumentException e)
{
logger.error("Error handling FinalizePropose message for {}", session, e);
failSession(sessionID);
}
}
@VisibleForTesting
protected void sessionCompleted(LocalSession session)
{
for (TableId tid: session.tableIds)
{
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
if (cfs != null)
{
cfs.getRepairManager().incrementalSessionCompleted(session.sessionID);
}
}
}
/**
* Finalizes the repair session, completing it as successful.
*
* This only changes the state of the session, it doesn't promote the siloed sstables to repaired. That will happen
* as part of the compaction process, and avoids having to worry about in progress compactions interfering with the
* promotion.
*/
public void handleFinalizeCommitMessage(InetAddressAndPort from, FinalizeCommit commit)
{
logger.trace("received {} from {}", commit, from);
UUID sessionID = commit.sessionID;
LocalSession session = getSession(sessionID);
if (session == null)
{
logger.warn("Ignoring FinalizeCommit message for unknown repair session {}", sessionID);
return;
}
setStateAndSave(session, FINALIZED);
logger.info("Finalized local repair session {}", sessionID);
}
public void handleFailSessionMessage(InetAddressAndPort from, FailSession msg)
{
logger.trace("received {} from {}", msg, from);
failSession(msg.sessionID, false);
}
public void sendStatusRequest(LocalSession session)
{
logger.info("Attempting to learn the outcome of unfinished local incremental repair session {}", session.sessionID);
Message<StatusRequest> request = Message.out(STATUS_REQ, new StatusRequest(session.sessionID));
for (InetAddressAndPort participant : session.participants)
{
if (!getBroadcastAddressAndPort().equals(participant) && isAlive(participant))
{
sendMessage(participant, request);
}
}
}
public void handleStatusRequest(InetAddressAndPort from, StatusRequest request)
{
logger.trace("received {} from {}", request, from);
UUID sessionID = request.sessionID;
LocalSession session = getSession(sessionID);
if (session == null)
{
logger.warn("Received status request message for unknown session {}", sessionID);
sendMessage(from, Message.out(STATUS_RSP, new StatusResponse(sessionID, FAILED)));
}
else
{
sendMessage(from, Message.out(STATUS_RSP, new StatusResponse(sessionID, session.getState())));
logger.info("Responding to status response message for incremental repair session {} with local state {}", sessionID, session.getState());
}
}
public void handleStatusResponse(InetAddressAndPort from, StatusResponse response)
{
logger.trace("received {} from {}", response, from);
UUID sessionID = response.sessionID;
LocalSession session = getSession(sessionID);
if (session == null)
{
logger.warn("Received StatusResponse message for unknown repair session {}", sessionID);
return;
}
// only change local state if response state is FINALIZED or FAILED, since those are
// the only statuses that would indicate we've missed a message completing the session
if (response.state == FINALIZED || response.state == FAILED)
{
setStateAndSave(session, response.state);
logger.info("Unfinished local incremental repair session {} set to state {}", sessionID, response.state);
}
else
{
logger.info("Received StatusResponse for repair session {} with state {}, which is not actionable. Doing nothing.", sessionID, response.state);
}
}
/**
* determines if a local session exists, and if it's not finalized or failed
*/
public boolean isSessionInProgress(UUID sessionID)
{
LocalSession session = getSession(sessionID);
return session != null && session.getState() != FINALIZED && session.getState() != FAILED;
}
/**
* determines if a local session exists, and if it's in the finalized state
*/
public boolean isSessionFinalized(UUID sessionID)
{
LocalSession session = getSession(sessionID);
return session != null && session.getState() == FINALIZED;
}
/**
* determines if a local session exists
*/
public boolean sessionExists(UUID sessionID)
{
return getSession(sessionID) != null;
}
@VisibleForTesting
protected boolean sessionHasData(LocalSession session)
{
Predicate<TableId> predicate = tid -> {
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
return cfs != null && cfs.getCompactionStrategyManager().hasDataForPendingRepair(session.sessionID);
};
return Iterables.any(session.tableIds, predicate::test);
}
/**
* Returns the repairedAt time for a sessions which is unknown, failed, or finalized
* calling this for a session which is in progress throws an exception
*/
public long getFinalSessionRepairedAt(UUID sessionID)
{
LocalSession session = getSession(sessionID);
if (session == null || session.getState() == FAILED)
{
return ActiveRepairService.UNREPAIRED_SSTABLE;
}
else if (session.getState() == FINALIZED)
{
return session.repairedAt;
}
else
{
throw new IllegalStateException("Cannot get final repaired at value for in progress session: " + session);
}
}
public static void registerListener(Listener listener)
{
listeners.add(listener);
}
public static void unregisterListener(Listener listener)
{
listeners.remove(listener);
}
public interface Listener
{
void onIRStateChange(LocalSession session);
}
}