blob: ae54d9edd48ab15cab907270c2ba63d24c4d9b2e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hints;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.MBeanWrapper;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Iterables.size;
/**
* A singleton-ish wrapper over various hints components:
* - a catalog of all hints stores
* - a single-threaded write executor
* - a multi-threaded dispatch executor
* - the buffer pool for writing hints into
*
* The front-end for everything hints related.
*/
public final class HintsService implements HintsServiceMBean
{
private static final Logger logger = LoggerFactory.getLogger(HintsService.class);
public static HintsService instance = new HintsService();
private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
private static final int MIN_BUFFER_SIZE = 32 << 20;
static final ImmutableMap<String, Object> EMPTY_PARAMS = ImmutableMap.of();
private final HintsCatalog catalog;
private final HintsWriteExecutor writeExecutor;
private final HintsBufferPool bufferPool;
private final HintsDispatchExecutor dispatchExecutor;
private final AtomicBoolean isDispatchPaused;
private volatile boolean isShutDown = false;
private final ScheduledFuture triggerFlushingFuture;
private volatile ScheduledFuture triggerDispatchFuture;
public final HintedHandoffMetrics metrics;
private HintsService()
{
this(FailureDetector.instance);
}
@VisibleForTesting
HintsService(IFailureDetector failureDetector)
{
File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
catalog = HintsCatalog.load(hintsDirectory, createDescriptorParams());
writeExecutor = new HintsWriteExecutor(catalog);
int bufferSize = Math.max(DatabaseDescriptor.getMaxMutationSize() * 2, MIN_BUFFER_SIZE);
bufferPool = new HintsBufferPool(bufferSize, writeExecutor::flushBuffer);
isDispatchPaused = new AtomicBoolean(true);
dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused, failureDetector::isAlive);
// periodically empty the current content of the buffers
int flushPeriod = DatabaseDescriptor.getHintsFlushPeriodInMS();
triggerFlushingFuture = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() -> writeExecutor.flushBufferPool(bufferPool),
flushPeriod,
flushPeriod,
TimeUnit.MILLISECONDS);
metrics = new HintedHandoffMetrics();
}
private static ImmutableMap<String, Object> createDescriptorParams()
{
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
ParameterizedClass compressionConfig = DatabaseDescriptor.getHintsCompression();
if (compressionConfig != null)
{
ImmutableMap.Builder<String, Object> compressorParams = ImmutableMap.builder();
compressorParams.put(ParameterizedClass.CLASS_NAME, compressionConfig.class_name);
if (compressionConfig.parameters != null)
{
compressorParams.put(ParameterizedClass.PARAMETERS, compressionConfig.parameters);
}
builder.put(HintsDescriptor.COMPRESSION, compressorParams.build());
}
return builder.build();
}
public void registerMBean()
{
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
/**
* Write a hint for a iterable of nodes.
*
* @param hostIds host ids of the hint's target nodes
* @param hint the hint to store
*/
public void write(Iterable<UUID> hostIds, Hint hint)
{
if (isShutDown)
throw new IllegalStateException("HintsService is shut down and can't accept new hints");
// we have to make sure that the HintsStore instances get properly initialized - otherwise dispatch will not trigger
catalog.maybeLoadStores(hostIds);
bufferPool.write(hostIds, hint);
StorageMetrics.totalHints.inc(size(hostIds));
}
/**
* Write a hint for a single node.
*
* @param hostId host id of the hint's target node
* @param hint the hint to store
*/
public void write(UUID hostId, Hint hint)
{
write(Collections.singleton(hostId), hint);
}
/**
* Write a hint for all replicas. Used to re-dispatch hints whose destination is either missing or no longer correct.
*/
void writeForAllReplicas(Hint hint)
{
String keyspaceName = hint.mutation.getKeyspaceName();
Token token = hint.mutation.key().getToken();
Iterable<UUID> hostIds =
transform(filter(StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token), StorageProxy::shouldHint),
StorageService.instance::getHostIdForEndpoint);
write(hostIds, hint);
}
/**
* Flush the buffer pool for the selected target nodes, then fsync their writers.
*
* @param hostIds host ids of the nodes to flush and fsync hints for
*/
public void flushAndFsyncBlockingly(Iterable<UUID> hostIds)
{
Iterable<HintsStore> stores = transform(hostIds, catalog::get);
writeExecutor.flushBufferPool(bufferPool, stores);
writeExecutor.fsyncWritersBlockingly(stores);
}
public synchronized void startDispatch()
{
if (isShutDown)
throw new IllegalStateException("HintsService is shut down and cannot be restarted");
isDispatchPaused.set(false);
HintsDispatchTrigger trigger = new HintsDispatchTrigger(catalog, writeExecutor, dispatchExecutor, isDispatchPaused);
// triggering hint dispatch is now very cheap, so we can do it more often - every 10 seconds vs. every 10 minutes,
// previously; this reduces mean time to delivery, and positively affects batchlog delivery latencies, too
triggerDispatchFuture = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(trigger, 10, 10, TimeUnit.SECONDS);
}
public void pauseDispatch()
{
logger.info("Paused hints dispatch");
isDispatchPaused.set(true);
}
public void resumeDispatch()
{
logger.info("Resumed hints dispatch");
isDispatchPaused.set(false);
}
/**
* Gracefully and blockingly shut down the service.
*
* Will abort dispatch sessions that are currently in progress (which is okay, it's idempotent),
* and make sure the buffers are flushed, hints files written and fsynced.
*/
public synchronized void shutdownBlocking() throws ExecutionException, InterruptedException
{
if (isShutDown)
throw new IllegalStateException("HintsService has already been shut down");
isShutDown = true;
if (triggerDispatchFuture != null)
triggerDispatchFuture.cancel(false);
pauseDispatch();
triggerFlushingFuture.cancel(false);
writeExecutor.flushBufferPool(bufferPool).get();
writeExecutor.closeAllWriters().get();
dispatchExecutor.shutdownBlocking();
writeExecutor.shutdownBlocking();
bufferPool.close();
}
/**
* Deletes all hints for all destinations. Doesn't make snapshots - should be used with care.
*/
public void deleteAllHints()
{
catalog.deleteAllHints();
}
/**
* Deletes all hints for the provided destination. Doesn't make snapshots - should be used with care.
*
* @param address inet address of the target node - encoded as a string for easier JMX consumption
*/
public void deleteAllHintsForEndpoint(String address)
{
InetAddress target;
try
{
target = InetAddress.getByName(address);
}
catch (UnknownHostException e)
{
throw new IllegalArgumentException(e);
}
deleteAllHintsForEndpoint(target);
}
/**
* Deletes all hints for the provided destination. Doesn't make snapshots - should be used with care.
*
* @param target inet address of the target node
*/
public void deleteAllHintsForEndpoint(InetAddress target)
{
UUID hostId = StorageService.instance.getHostIdForEndpoint(target);
if (hostId == null)
throw new IllegalArgumentException("Can't delete hints for unknown address " + target);
catalog.deleteAllHints(hostId);
}
/**
* Cleans up hints-related state after a node with id = hostId left.
*
* Dispatcher can not stop itself (isHostAlive() can not start returning false for the leaving host because this
* method is called by the same thread as gossip, which blocks gossip), so we can't simply wait for
* completion.
*
* We should also flush the buffer if there are any hints for the node there, and close the writer (if any),
* so that we don't leave any hint files lying around.
*
* Once that is done, we can simply delete all hint files and remove the host id from the catalog.
*
* The worst that can happen if we don't get everything right is a hints file (or two) remaining undeleted.
*
* @param hostId id of the node being excised
*/
public void excise(UUID hostId)
{
HintsStore store = catalog.getNullable(hostId);
if (store == null)
return;
// flush the buffer and then close the writer for the excised host id, to make sure that no new files will appear
// for this host id after we are done
Future flushFuture = writeExecutor.flushBufferPool(bufferPool, Collections.singleton(store));
Future closeFuture = writeExecutor.closeWriter(store);
try
{
flushFuture.get();
closeFuture.get();
}
catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
// interrupt the current dispatch session to end (if any), so that the currently dispatched file gets removed
dispatchExecutor.interruptDispatch(store.hostId);
// delete all the hints files and remove the HintsStore instance from the map in the catalog
catalog.exciseStore(hostId);
}
/**
* Transfer all local hints to the hostId supplied by hostIdSupplier
*
* Flushes the buffer to make sure all hints are on disk and closes the hint writers
* so we don't leave any hint files around.
*
* After that, we serially dispatch all the hints in the HintsCatalog.
*
* If we fail delivering all hints, we will ask the hostIdSupplier for a new target host
* and retry delivering any remaining hints there, once, with a delay of 10 seconds before retrying.
*
* @param hostIdSupplier supplier of stream target host ids. This is generally
* the closest one according to the DynamicSnitch
* @return When this future is done, it either has streamed all hints to remote nodes or has failed with a proper
* log message
*/
public Future transferHints(Supplier<UUID> hostIdSupplier)
{
Future flushFuture = writeExecutor.flushBufferPool(bufferPool);
Future closeFuture = writeExecutor.closeAllWriters();
try
{
flushFuture.get();
closeFuture.get();
}
catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
// unpause dispatch, or else transfer() will return immediately
resumeDispatch();
// wait for the current dispatch session to end
catalog.stores().forEach(dispatchExecutor::completeDispatchBlockingly);
return dispatchExecutor.transfer(catalog, hostIdSupplier);
}
HintsCatalog getCatalog()
{
return catalog;
}
/**
* Returns true in case service is shut down.
*/
public boolean isShutDown()
{
return isShutDown;
}
}