blob: 94ea6d0f2be15c750952bf41e0ba609154daa3c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.bulkwriter;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Range;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import o.a.c.sidecar.client.shaded.common.data.CreateSliceRequestPayload;
import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.spark.bulkwriter.blobupload.BlobDataTransferApi;
import org.apache.cassandra.spark.bulkwriter.blobupload.BlobStreamResult;
import org.apache.cassandra.spark.bulkwriter.blobupload.CreatedRestoreSlice;
import org.apache.cassandra.spark.bulkwriter.util.ThreadUtil;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportExtension;
import static org.apache.cassandra.clients.Sidecar.toSidecarInstance;
import static org.apache.cassandra.spark.bulkwriter.blobupload.CreatedRestoreSlice.ConsistencyLevelCheckResult.NOT_SATISFIED;
import static org.apache.cassandra.spark.bulkwriter.blobupload.CreatedRestoreSlice.ConsistencyLevelCheckResult.SATISFIED;
public final class ImportCompletionCoordinator
{
private static final Logger LOGGER = LoggerFactory.getLogger(ImportCompletionCoordinator.class);
private final long startTimeNanos;
private final BlobDataTransferApi dataTransferApi;
private final BulkWriteValidator writeValidator;
private final List<BlobStreamResult> blobStreamResultList;
private final JobInfo job;
private final ScheduledExecutorService scheduler;
private final CassandraTopologyMonitor cassandraTopologyMonitor;
private final ReplicationFactor replicationFactor;
private final StorageTransportExtension extension;
private final CompletableFuture<Void> firstFailure = new CompletableFuture<>();
private final CompletableFuture<Void> terminal = new CompletableFuture<>();
private final Map<CompletableFuture<Void>, RequestAndInstance> importFutures = new HashMap<>();
private final AtomicBoolean terminalScheduled = new AtomicBoolean(false);
private final AtomicInteger completedSlices = new AtomicInteger(0);
private long waitStartNanos;
private long minSliceSize = Long.MAX_VALUE;
private long maxSliceSize = Long.MIN_VALUE;
private int totalSlices;
private AtomicInteger satisfiedSlices;
private ImportCompletionCoordinator(long startTimeNanos,
BulkWriterContext writerContext,
BlobDataTransferApi dataTransferApi,
BulkWriteValidator writeValidator,
List<BlobStreamResult> blobStreamResultList,
StorageTransportExtension extension,
Consumer<CancelJobEvent> onCancelJob)
{
this(startTimeNanos, writerContext, dataTransferApi, writeValidator, blobStreamResultList, extension, onCancelJob, CassandraTopologyMonitor::new);
}
@VisibleForTesting
ImportCompletionCoordinator(long startTimeNanos,
BulkWriterContext writerContext,
BlobDataTransferApi dataTransferApi,
BulkWriteValidator writeValidator,
List<BlobStreamResult> blobStreamResultList,
StorageTransportExtension extension,
Consumer<CancelJobEvent> onCancelJob,
BiFunction<ClusterInfo, Consumer<CancelJobEvent>, CassandraTopologyMonitor> monitorCreator)
{
this.startTimeNanos = startTimeNanos;
this.job = writerContext.job();
this.dataTransferApi = dataTransferApi;
this.writeValidator = writeValidator;
this.blobStreamResultList = blobStreamResultList;
this.extension = extension;
ThreadFactory tf = ThreadUtil.threadFactory("Import completion timeout");
this.scheduler = Executors.newSingleThreadScheduledExecutor(tf);
Consumer<CancelJobEvent> wrapped = cancelJobEvent -> {
// try to complete the firstFailure, in order to exit coordinator ASAP
firstFailure.completeExceptionally(new RuntimeException(cancelJobEvent.reason, cancelJobEvent.exception));
onCancelJob.accept(cancelJobEvent);
};
this.cassandraTopologyMonitor = monitorCreator.apply(writerContext.cluster(), wrapped);
this.replicationFactor = cassandraTopologyMonitor.initialTopology().replicationFactor();
}
public static ImportCompletionCoordinator of(long startTimeNanos,
BulkWriterContext writerContext,
BlobDataTransferApi dataTransferApi,
BulkWriteValidator writeValidator,
List<BlobStreamResult> resultsAsBlobStreamResults,
StorageTransportExtension extension,
Consumer<CancelJobEvent> onCancelJob)
{
return new ImportCompletionCoordinator(startTimeNanos,
writerContext, dataTransferApi,
writeValidator, resultsAsBlobStreamResults,
extension, onCancelJob);
}
/**
* Block for the imports to complete by invoking the CreateRestoreJobSlice call to the server.
* The method passes when the successful import can satisfy the configured consistency level;
* otherwise, the method fails.
* The wait is indefinite until one of the following conditions is met,
* 1) _all_ slices have been checked, or
* 2) the spark job reaches to its completion timeout
* 3) At least one slice fails CL validation, as the job will eventually fail in this case.
* this means that some slices may never be processed by this loop
* <p>
* When there is a slice failed on CL validation and there are remaining slices to check, the wait continues.
*/
public void waitForCompletion()
{
writeValidator.setPhase("WaitForCommitCompletion");
try
{
waitForCompletionInternal();
}
finally
{
if (terminal.isDone())
{
LOGGER.info("Concluded the safe termination, given the specified consistency level is satisfied " +
"and enough time has been blocked for importing slices.");
}
cassandraTopologyMonitor.shutdownNow();
importFutures.keySet().forEach(f -> f.cancel(true));
terminal.complete(null);
scheduler.shutdownNow(); // shutdown and do not wait for the termination; the job is completing
}
}
private void waitForCompletionInternal()
{
prepareToPoll();
startPolling();
await();
}
private void prepareToPoll()
{
totalSlices = blobStreamResultList.stream().mapToInt(res -> res.createdRestoreSlices.size()).sum();
blobStreamResultList
.stream()
.flatMap(res -> res.createdRestoreSlices
.stream()
.map(CreatedRestoreSlice::sliceRequestPayload))
.mapToLong(slice -> {
// individual task should never return slice with 0-size bundle
long size = slice.compressedSizeOrZero();
if (size == 0)
{
throw new IllegalStateException("Found invalid slice with 0 compressed size. " +
"slice: " + slice);
}
return size;
})
.forEach(size -> {
minSliceSize = Math.min(minSliceSize, size);
maxSliceSize = Math.max(maxSliceSize, size);
});
satisfiedSlices = new AtomicInteger(0);
waitStartNanos = System.nanoTime();
}
private void startPolling()
{
for (BlobStreamResult blobStreamResult : blobStreamResultList)
{
for (CreatedRestoreSlice createdRestoreSlice : blobStreamResult.createdRestoreSlices)
{
for (RingInstance instance : blobStreamResult.passed)
{
createSliceInstanceFuture(createdRestoreSlice, instance);
}
}
}
}
private void addCompletionMonitor(CompletableFuture<?> future)
{
// whenComplete callback will still be invoked when the future is cancelled.
// In such case, expect CancellationException
future.whenComplete((v, t) -> {
LOGGER.info("Completed slice requests {}/{}", completedSlices.incrementAndGet(), importFutures.keySet().size());
if (t instanceof CancellationException)
{
RequestAndInstance rai = importFutures.get(future);
LOGGER.info("Cancelled import. instance={} slice={}", rai.nodeFqdn, rai.requestPayload);
return;
}
// only enter the block once
if (satisfiedSlices.get() == totalSlices
&& terminalScheduled.compareAndSet(false, true))
{
long timeToAllSatisfiedNanos = System.nanoTime() - waitStartNanos;
long timeout = estimateTimeout(timeToAllSatisfiedNanos);
LOGGER.info("The specified consistency level of the job has been satisfied. " +
"Continuing to waiting on slices completion in order to prevent Cassandra side " +
"streaming as much as possible. The estimated additional wait time is {} seconds.",
TimeUnit.NANOSECONDS.toSeconds(timeout));
// schedule to complete the terminal
scheduler.schedule(() -> terminal.complete(null),
timeout, TimeUnit.NANOSECONDS);
}
});
}
private void await()
{
// the result either fail early once firstFailure future completes exceptionally, reached timeout (while CL is satisfied),
// or the results list completes
CompletableFuture.anyOf(firstFailure, terminal,
CompletableFuture.allOf(importFutures.keySet().toArray(new CompletableFuture[0])))
.join();
// double check to make sure all slices are satisfied
// Because at this point all ranges have been either satisfied or the job has already failed,
// this is really just a sanity check for things like lost futures/future-introduced bugs
validateAllRangesAreSatisfied();
}
// calculate the timeout based on the 1) time taken to have all slices satisfied, and 2) use import rate
private long estimateTimeout(long timeToAllSatisfiedNanos)
{
long timeout = timeToAllSatisfiedNanos;
// use the minSliceSize to get the slowest import rate. R = minSliceSize / T
// use the maxSliceSize to get the highest amount of time needed for import. D = maxSliceSize / R
// Please do not combine the two statements below for readability purpose
double estimatedRateFloor = ((double) minSliceSize) / timeToAllSatisfiedNanos;
double timeEstimateBasedOnRate = ((double) maxSliceSize) / estimatedRateFloor;
timeout = Math.max((long) timeEstimateBasedOnRate, timeout);
timeout = job.importCoordinatorTimeoutMultiplier() * timeout;
if (TimeUnit.NANOSECONDS.toHours(timeout) > 1)
{
LOGGER.warn("The estimated additional timeout is more than 1 hour. timeout={} seconds",
TimeUnit.NANOSECONDS.toSeconds(timeout));
}
return timeout;
}
private void createSliceInstanceFuture(CreatedRestoreSlice createdRestoreSlice,
RingInstance instance)
{
if (firstFailure.isCompletedExceptionally())
{
LOGGER.warn("The job has failed already. Skip sending import request. instance={} slice={}",
instance.nodeName(), createdRestoreSlice.sliceRequestPayload());
return;
}
SidecarInstance sidecarInstance = toSidecarInstance(instance, job.effectiveSidecarPort());
CreateSliceRequestPayload createSliceRequestPayload = createdRestoreSlice.sliceRequestPayload();
CompletableFuture<Void> fut = dataTransferApi.createRestoreSliceFromDriver(sidecarInstance,
createSliceRequestPayload);
fut = fut.handleAsync((ignored, throwable) -> {
if (throwable == null)
{
handleSuccessfulSliceInstance(createdRestoreSlice, instance, createSliceRequestPayload);
}
else
{
// use handle API to swallow the throwable on purpose; the throwable is set to `firstFailure`
handleFailedSliceInstance(instance, createSliceRequestPayload, throwable);
}
return null;
});
addCompletionMonitor(fut);
// Use the fut variable (, instead of the new future object from whenComplete) for key on purpose.
// So that whenComplete callback can receive CancellationException
importFutures.put(fut, new RequestAndInstance(createSliceRequestPayload, instance.nodeName()));
}
private void handleFailedSliceInstance(RingInstance instance,
CreateSliceRequestPayload createSliceRequestPayload,
Throwable throwable)
{
LOGGER.warn("Import failed. instance={} slice={}", instance.nodeName(), createSliceRequestPayload, throwable);
Range<BigInteger> range = Range.openClosed(createSliceRequestPayload.startToken(),
createSliceRequestPayload.endToken());
writeValidator.updateFailureHandler(range, instance, "Failed to import slice. " + throwable.getMessage());
// it either passes or throw if consistency level cannot be satisfied
try
{
writeValidator.validateClOrFail(cassandraTopologyMonitor.initialTopology());
}
catch (RuntimeException rte)
{
// record the first failure and cancel queued futures.
firstFailure.completeExceptionally(rte);
}
}
private void handleSuccessfulSliceInstance(CreatedRestoreSlice createdRestoreSlice,
RingInstance instance,
CreateSliceRequestPayload createSliceRequestPayload)
{
LOGGER.info("Import succeeded. instance={} slice={}", instance.nodeName(), createSliceRequestPayload);
createdRestoreSlice.addSucceededInstance(instance);
if (SATISFIED ==
createdRestoreSlice.checkForConsistencyLevel(job.getConsistencyLevel(),
replicationFactor,
job.getLocalDC()))
{
satisfiedSlices.incrementAndGet();
try
{
extension.onObjectApplied(createSliceRequestPayload.bucket(),
createSliceRequestPayload.key(),
createSliceRequestPayload.compressedSizeOrZero(),
System.nanoTime() - startTimeNanos);
}
catch (Throwable t)
{
// log a warning message and carry on
LOGGER.warn("StorageTransportExtension fails to process ObjectApplied notification", t);
}
}
}
/**
* Validate that all ranges should collect enough write acknowledges to satisfy the consistency level
* It throws when there is any range w/o enough write acknowledges
*/
private void validateAllRangesAreSatisfied()
{
List<CreatedRestoreSlice> unsatisfiedSlices = new ArrayList<>();
for (BlobStreamResult blobStreamResult : blobStreamResultList)
{
for (CreatedRestoreSlice createdRestoreSlice : blobStreamResult.createdRestoreSlices)
{
if (NOT_SATISFIED == createdRestoreSlice.checkForConsistencyLevel(job.getConsistencyLevel(),
replicationFactor,
job.getLocalDC()))
{
unsatisfiedSlices.add(createdRestoreSlice);
}
}
}
if (unsatisfiedSlices.isEmpty())
{
LOGGER.info("All token ranges have satisfied with consistency level. consistencyLevel={} phase={}",
job.getConsistencyLevel(), writeValidator.getPhase());
}
else
{
String message = String.format("Some of the token ranges cannot satisfy with consistency level. " +
"job=%s phase=%s consistencyLevel=%s ranges=%s",
job.getRestoreJobId(), writeValidator.getPhase(), job.getConsistencyLevel(), unsatisfiedSlices);
LOGGER.error(message);
throw new RuntimeException(message);
}
}
@VisibleForTesting
Map<CompletableFuture<Void>, RequestAndInstance> importFutures()
{
return importFutures;
}
@VisibleForTesting
CompletableFuture<Void> firstFailure()
{
return firstFailure;
}
// simple data class to group the request and the node fqdn
static class RequestAndInstance
{
final String nodeFqdn;
final CreateSliceRequestPayload requestPayload;
RequestAndInstance(CreateSliceRequestPayload requestPayload, String nodeFqdn)
{
this.nodeFqdn = nodeFqdn;
this.requestPayload = requestPayload;
}
}
}