blob: 7a570d2c93412eb231e0003287c4a60d13a98a6c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.*;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import java.util.List;
import org.apache.cassandra.utils.MBeanWrapper;
import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
/**
* The hint schema looks like this:
*
* CREATE TABLE hints (
* target_id uuid,
* hint_id timeuuid,
* message_version int,
* mutation blob,
* PRIMARY KEY (target_id, hint_id, message_version)
* ) WITH COMPACT STORAGE;
*
* Thus, for each node in the cluster we treat its uuid as the partition key; each hint is a logical row
* (physical composite column) containing the mutation to replay and associated metadata.
*
* When FailureDetector signals that a node that was down is back up, we page through
* the hinted mutations and send them over one at a time, waiting for
* hinted_handoff_throttle_delay in between each.
*
* deliverHints is also exposed to JMX so it can be run manually if FD ever misses
* its cue somehow.
*/
public class HintedHandOffManager implements HintedHandOffManagerMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=HintedHandoffManager";
public static final HintedHandOffManager instance = new HintedHandOffManager();
private static final Logger logger = LoggerFactory.getLogger(HintedHandOffManager.class);
private static final int PAGE_SIZE = 128;
private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
public final HintedHandoffMetrics metrics = new HintedHandoffMetrics();
private volatile boolean hintedHandOffPaused = false;
static final int maxHintTTL = Integer.parseInt(System.getProperty("cassandra.maxHintTTL", String.valueOf(Integer.MAX_VALUE)));
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<>();
// To keep metrics consistent with earlier versions, where periodic tasks were run on a shared executor,
// we run them on this executor and so keep counts separate from those for hint delivery tasks. See CASSANDRA-9129
private final DebuggableScheduledThreadPoolExecutor executor =
new DebuggableScheduledThreadPoolExecutor(1, new NamedThreadFactory("HintedHandoffManager", Thread.MIN_PRIORITY));
// Non-scheduled executor to run the actual hint delivery tasks.
// Per CASSANDRA-9129, this is where the values displayed in nodetool tpstats
// and via the HintedHandoff mbean are obtained.
private final ThreadPoolExecutor hintDeliveryExecutor =
new JMXEnabledThreadPoolExecutor(
DatabaseDescriptor.getMaxHintsThread(),
Integer.MAX_VALUE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
"internal");
private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
/**
* Returns a mutation representing a Hint to be sent to <code>targetId</code>
* as soon as it becomes available again.
*/
public Mutation hintFor(Mutation mutation, long now, int ttl, Pair<InetAddress, UUID> target)
{
assert ttl > 0;
InetAddress endpoint = target.left;
UUID targetId = target.right;
metrics.incrCreatedHints(endpoint);
UUID hintId = UUIDGen.getTimeUUID();
// serialize the hint with id and version as a composite column name
CellName name = SystemKeyspace.Hints.comparator.makeCellName(hintId, MessagingService.current_version);
ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(SystemKeyspace.NAME, SystemKeyspace.HINTS));
cf.addColumn(name, value, now, ttl);
return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(targetId), cf);
}
/*
* determine the TTL for the hint Mutation
* this is set at the smallest GCGraceSeconds for any of the CFs in the RM
* this ensures that deletes aren't "undone" by delivery of an old hint
*/
public static int calculateHintTTL(Mutation mutation)
{
int ttl = maxHintTTL;
for (ColumnFamily cf : mutation.getColumnFamilies())
ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
return ttl;
}
public void start()
{
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
logger.trace("Created HHOM instance, registered MBean.");
Runnable runnable = new Runnable()
{
public void run()
{
scheduleAllDeliveries();
metrics.log();
}
};
executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
{
Mutation mutation = new Mutation(SystemKeyspace.NAME, tokenBytes);
mutation.delete(SystemKeyspace.HINTS, columnName, timestamp);
mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
}
public void deleteHintsForEndpoint(final String ipOrHostname)
{
try
{
InetAddress endpoint = InetAddress.getByName(ipOrHostname);
deleteHintsForEndpoint(endpoint);
}
catch (UnknownHostException e)
{
logger.warn("Unable to find {}, not a hostname or ipaddr of a node", ipOrHostname);
throw new RuntimeException(e);
}
}
public void deleteHintsForEndpoint(final InetAddress endpoint)
{
if (!StorageService.instance.getTokenMetadata().isMember(endpoint))
return;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
if (hostId == null)
return;
ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
final Mutation mutation = new Mutation(SystemKeyspace.NAME, hostIdBytes);
mutation.delete(SystemKeyspace.HINTS, System.currentTimeMillis());
// execute asynchronously to avoid blocking caller (which may be processing gossip)
Runnable runnable = new Runnable()
{
public void run()
{
try
{
logger.info("Deleting any stored hints for {}", endpoint);
mutation.apply();
hintStore.forceBlockingFlush();
compact();
}
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
logger.warn("Could not delete hints for {}: {}", endpoint, e);
}
}
};
executor.submit(runnable);
}
//foobar
public void truncateAllHints() throws ExecutionException, InterruptedException
{
Runnable runnable = new Runnable()
{
public void run()
{
try
{
logger.info("Truncating all stored hints.");
Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS).truncateBlocking();
}
catch (Exception e)
{
logger.warn("Could not truncate all hints.", e);
}
}
};
executor.submit(runnable).get();
}
@VisibleForTesting
protected synchronized void compact()
{
ArrayList<Descriptor> descriptors = new ArrayList<>();
for (SSTable sstable : hintStore.getTracker().getUncompacting())
descriptors.add(sstable.descriptor);
if (descriptors.isEmpty())
return;
try
{
CompactionManager.instance.submitUserDefined(hintStore, descriptors, (int) (System.currentTimeMillis() / 1000)).get();
}
catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
}
private static boolean pagingFinished(ColumnFamily hintColumnFamily, Composite startColumn)
{
// done if no hints found or the start column (same as last column processed in previous iteration) is the only one
return hintColumnFamily == null
|| (!startColumn.isEmpty() && hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn((CellName)startColumn) != null);
}
private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
{
Gossiper gossiper = Gossiper.instance;
int waited = 0;
// first, wait for schema to be gossiped.
while (gossiper.getEndpointStateForEndpoint(endpoint) != null && gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null)
{
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
waited += 1000;
if (waited > 2 * StorageService.RING_DELAY)
throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
}
if (gossiper.getEndpointStateForEndpoint(endpoint) == null)
throw new TimeoutException("Node " + endpoint + " vanished while waiting for agreement");
waited = 0;
// then wait for the correct schema version.
// usually we use DD.getDefsVersion, which checks the local schema uuid as stored in the system keyspace.
// here we check the one in gossip instead; this serves as a canary to warn us if we introduce a bug that
// causes the two to diverge (see CASSANDRA-2946)
while (gossiper.getEndpointStateForEndpoint(endpoint) != null && !gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
{
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
waited += 1000;
if (waited > 2 * StorageService.RING_DELAY)
throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
}
if (gossiper.getEndpointStateForEndpoint(endpoint) == null)
throw new TimeoutException("Node " + endpoint + " vanished while waiting for agreement");
logger.trace("schema for {} matches local schema", endpoint);
return waited;
}
private void deliverHintsToEndpoint(InetAddress endpoint)
{
if (hintStore.isEmpty())
return; // nothing to do, don't confuse users by logging a no-op handoff
// check if hints delivery has been paused
if (hintedHandOffPaused)
{
logger.trace("Hints delivery process is paused, aborting");
return;
}
logger.trace("Checking remote({}) schema before delivering hints", endpoint);
try
{
waitForSchemaAgreement(endpoint);
}
catch (TimeoutException e)
{
return;
}
if (!FailureDetector.instance.isAlive(endpoint))
{
logger.trace("Endpoint {} died before hint delivery, aborting", endpoint);
return;
}
doDeliverHintsToEndpoint(endpoint);
}
/*
* 1. Get the key of the endpoint we need to handoff
* 2. For each column, deserialize the mutation and send it to the endpoint
* 3. Delete the column if the write was successful
* 4. Force a flush
*/
private void doDeliverHintsToEndpoint(InetAddress endpoint)
{
// find the hints for the node using its token.
UUID hostId = Gossiper.instance.getHostId(endpoint);
logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint);
final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes);
final AtomicInteger rowsReplayed = new AtomicInteger(0);
Composite startColumn = Composites.EMPTY;
int pageSize = calculatePageSize();
logger.trace("Using pageSize of {}", pageSize);
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
// max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB()
/ (StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
delivery:
while (true)
{
long now = System.currentTimeMillis();
QueryFilter filter = QueryFilter.getSliceFilter(epkey,
SystemKeyspace.HINTS,
startColumn,
Composites.EMPTY,
false,
pageSize,
now);
ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int) (now / 1000));
if (pagingFinished(hintsPage, startColumn))
{
logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint);
break;
}
// check if node is still alive and we should continue delivery process
if (!FailureDetector.instance.isAlive(endpoint))
{
logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed);
break;
}
List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList();
for (final Cell hint : hintsPage)
{
// check if hints delivery has been paused during the process
if (hintedHandOffPaused)
{
logger.trace("Hints delivery process is paused, aborting");
break delivery;
}
// Skip tombstones:
// if we iterate quickly enough, it's possible that we could request a new page in the same millisecond
// in which the local deletion timestamp was generated on the last column in the old page, in which
// case the hint will have no columns (since it's deleted) but will still be included in the resultset
// since (even with gcgs=0) it's still a "relevant" tombstone.
if (!hint.isLive())
continue;
startColumn = hint.name();
int version = Int32Type.instance.compose(hint.name().get(1));
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
Mutation mutation;
try
{
mutation = Mutation.serializer.deserialize(in, version);
}
catch (UnknownColumnFamilyException e)
{
logger.trace("Skipping delivery of hint for deleted table", e);
deleteHint(hostIdBytes, hint.name(), hint.timestamp());
continue;
}
catch (IOException e)
{
throw new AssertionError(e);
}
for (UUID cfId : mutation.getColumnFamilyIds())
{
if (hint.timestamp() <= SystemKeyspace.getTruncatedAt(cfId))
{
logger.trace("Skipping delivery of hint for truncated table {}", cfId);
mutation = mutation.without(cfId);
}
}
if (mutation.isEmpty())
{
deleteHint(hostIdBytes, hint.name(), hint.timestamp());
continue;
}
MessageOut<Mutation> message = mutation.createMessage();
rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
Runnable callback = new Runnable()
{
public void run()
{
rowsReplayed.incrementAndGet();
deleteHint(hostIdBytes, hint.name(), hint.timestamp());
}
};
WriteResponseHandler<Mutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.SIMPLE, callback);
MessagingService.instance().sendRR(message, endpoint, responseHandler, false);
responseHandlers.add(responseHandler);
}
for (WriteResponseHandler<Mutation> handler : responseHandlers)
{
try
{
handler.get();
}
catch (WriteTimeoutException|WriteFailureException e)
{
logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}",
endpoint, rowsReplayed, e.getMessage());
break delivery;
}
}
}
// Flush all the tombstones to disk
hintStore.forceBlockingFlush();
}
// read less columns (mutations) per page if they are very large
private int calculatePageSize()
{
int meanColumnCount = hintStore.getMeanColumns();
if (meanColumnCount <= 0)
return PAGE_SIZE;
int averageColumnSize = (int) (hintStore.metric.meanRowSize.getValue() / meanColumnCount);
if (averageColumnSize <= 0)
return PAGE_SIZE;
// page size of 1 does not allow actual paging b/c of >= behavior on startColumn
return Math.max(2, Math.min(PAGE_SIZE, 4 * 1024 * 1024 / averageColumnSize));
}
/**
* Attempt delivery to any node for which we have hints. Necessary since we can generate hints even for
* nodes which are never officially down/failed.
*/
private void scheduleAllDeliveries()
{
logger.trace("Started scheduleAllDeliveries");
// Force a major compaction to get rid of the tombstones and expired hints. Do it once, before we schedule any
// individual replay, to avoid N - 1 redundant individual compactions (when N is the number of nodes with hints
// to deliver to).
compact();
IPartitioner p = StorageService.getPartitioner();
RowPosition minPos = p.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<>(minPos, minPos);
IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<CellName>of());
List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
for (Row row : rows)
{
UUID hostId = UUIDGen.getUUID(row.key.getKey());
InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
// token may have since been removed (in which case we have just read back a tombstone)
if (target != null)
scheduleHintDelivery(target, false);
}
logger.trace("Finished scheduleAllDeliveries");
}
/*
* This method is used to deliver hints to a particular endpoint.
* When we learn that some endpoint is back up we deliver the data
* to him via an event driven mechanism.
*/
public void scheduleHintDelivery(final InetAddress to, final boolean precompact)
{
// We should not deliver hints to the same host in 2 different threads
if (!queuedDeliveries.add(to))
return;
logger.trace("Scheduling delivery of Hints to {}", to);
hintDeliveryExecutor.execute(new Runnable()
{
public void run()
{
try
{
// If it's an individual node hint replay (triggered by Gossip or via JMX), and not the global scheduled replay
// (every 10 minutes), force a major compaction to get rid of the tombstones and expired hints.
if (precompact)
compact();
deliverHintsToEndpoint(to);
}
finally
{
queuedDeliveries.remove(to);
}
}
});
}
public void scheduleHintDelivery(String to) throws UnknownHostException
{
scheduleHintDelivery(InetAddress.getByName(to), true);
}
public void pauseHintsDelivery(boolean b)
{
hintedHandOffPaused = b;
}
public List<String> listEndpointsPendingHints()
{
Token.TokenFactory tokenFactory = StorageService.getPartitioner().getTokenFactory();
// Extract the keys as strings to be reported.
LinkedList<String> result = new LinkedList<>();
for (Row row : getHintsSlice(1))
{
if (row.cf != null) //ignore removed rows
result.addFirst(tokenFactory.toString(row.key.getToken()));
}
return result;
}
private List<Row> getHintsSlice(int columnCount)
{
// Get count # of columns...
SliceQueryFilter predicate = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY,
false,
columnCount);
// From keys "" to ""...
IPartitioner partitioner = StorageService.getPartitioner();
RowPosition minPos = partitioner.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<>(minPos, minPos);
try
{
RangeSliceCommand cmd = new RangeSliceCommand(SystemKeyspace.NAME,
SystemKeyspace.HINTS,
System.currentTimeMillis(),
predicate,
range,
null,
LARGE_NUMBER);
return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE);
}
catch (Exception e)
{
logger.info("HintsCF getEPPendingHints timed out.");
throw new RuntimeException(e);
}
}
@VisibleForTesting
public void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException
{
shutdown(executor, hintDeliveryExecutor);
awaitTermination(timeout, units, executor, hintDeliveryExecutor);
}
}