| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.spark.bulkwriter; |
| |
| import java.math.BigInteger; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.BiFunction; |
| import java.util.function.Consumer; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Range; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import o.a.c.sidecar.client.shaded.common.data.CreateSliceRequestPayload; |
| import org.apache.cassandra.sidecar.client.SidecarInstance; |
| import org.apache.cassandra.spark.bulkwriter.blobupload.BlobDataTransferApi; |
| import org.apache.cassandra.spark.bulkwriter.blobupload.BlobStreamResult; |
| import org.apache.cassandra.spark.bulkwriter.blobupload.CreatedRestoreSlice; |
| import org.apache.cassandra.spark.bulkwriter.util.ThreadUtil; |
| import org.apache.cassandra.spark.data.ReplicationFactor; |
| import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportExtension; |
| |
| import static org.apache.cassandra.clients.Sidecar.toSidecarInstance; |
| import static org.apache.cassandra.spark.bulkwriter.blobupload.CreatedRestoreSlice.ConsistencyLevelCheckResult.NOT_SATISFIED; |
| import static org.apache.cassandra.spark.bulkwriter.blobupload.CreatedRestoreSlice.ConsistencyLevelCheckResult.SATISFIED; |
| |
| public final class ImportCompletionCoordinator |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(ImportCompletionCoordinator.class); |
| |
| private final long startTimeNanos; |
| private final BlobDataTransferApi dataTransferApi; |
| private final BulkWriteValidator writeValidator; |
| private final List<BlobStreamResult> blobStreamResultList; |
| private final JobInfo job; |
| private final ScheduledExecutorService scheduler; |
| private final CassandraTopologyMonitor cassandraTopologyMonitor; |
| private final ReplicationFactor replicationFactor; |
| private final StorageTransportExtension extension; |
| private final CompletableFuture<Void> firstFailure = new CompletableFuture<>(); |
| private final CompletableFuture<Void> terminal = new CompletableFuture<>(); |
| private final Map<CompletableFuture<Void>, RequestAndInstance> importFutures = new HashMap<>(); |
| private final AtomicBoolean terminalScheduled = new AtomicBoolean(false); |
| private final AtomicInteger completedSlices = new AtomicInteger(0); |
| |
| private long waitStartNanos; |
| private long minSliceSize = Long.MAX_VALUE; |
| private long maxSliceSize = Long.MIN_VALUE; |
| private int totalSlices; |
| private AtomicInteger satisfiedSlices; |
| |
| private ImportCompletionCoordinator(long startTimeNanos, |
| BulkWriterContext writerContext, |
| BlobDataTransferApi dataTransferApi, |
| BulkWriteValidator writeValidator, |
| List<BlobStreamResult> blobStreamResultList, |
| StorageTransportExtension extension, |
| Consumer<CancelJobEvent> onCancelJob) |
| { |
| this(startTimeNanos, writerContext, dataTransferApi, writeValidator, blobStreamResultList, extension, onCancelJob, CassandraTopologyMonitor::new); |
| } |
| |
| @VisibleForTesting |
| ImportCompletionCoordinator(long startTimeNanos, |
| BulkWriterContext writerContext, |
| BlobDataTransferApi dataTransferApi, |
| BulkWriteValidator writeValidator, |
| List<BlobStreamResult> blobStreamResultList, |
| StorageTransportExtension extension, |
| Consumer<CancelJobEvent> onCancelJob, |
| BiFunction<ClusterInfo, Consumer<CancelJobEvent>, CassandraTopologyMonitor> monitorCreator) |
| { |
| this.startTimeNanos = startTimeNanos; |
| this.job = writerContext.job(); |
| this.dataTransferApi = dataTransferApi; |
| this.writeValidator = writeValidator; |
| this.blobStreamResultList = blobStreamResultList; |
| this.extension = extension; |
| ThreadFactory tf = ThreadUtil.threadFactory("Import completion timeout"); |
| this.scheduler = Executors.newSingleThreadScheduledExecutor(tf); |
| Consumer<CancelJobEvent> wrapped = cancelJobEvent -> { |
| // try to complete the firstFailure, in order to exit coordinator ASAP |
| firstFailure.completeExceptionally(new RuntimeException(cancelJobEvent.reason, cancelJobEvent.exception)); |
| onCancelJob.accept(cancelJobEvent); |
| }; |
| this.cassandraTopologyMonitor = monitorCreator.apply(writerContext.cluster(), wrapped); |
| this.replicationFactor = cassandraTopologyMonitor.initialTopology().replicationFactor(); |
| } |
| |
| |
| public static ImportCompletionCoordinator of(long startTimeNanos, |
| BulkWriterContext writerContext, |
| BlobDataTransferApi dataTransferApi, |
| BulkWriteValidator writeValidator, |
| List<BlobStreamResult> resultsAsBlobStreamResults, |
| StorageTransportExtension extension, |
| Consumer<CancelJobEvent> onCancelJob) |
| { |
| return new ImportCompletionCoordinator(startTimeNanos, |
| writerContext, dataTransferApi, |
| writeValidator, resultsAsBlobStreamResults, |
| extension, onCancelJob); |
| } |
| |
| /** |
| * Block for the imports to complete by invoking the CreateRestoreJobSlice call to the server. |
| * The method passes when the successful import can satisfy the configured consistency level; |
| * otherwise, the method fails. |
| * The wait is indefinite until one of the following conditions is met, |
| * 1) _all_ slices have been checked, or |
| * 2) the spark job reaches to its completion timeout |
| * 3) At least one slice fails CL validation, as the job will eventually fail in this case. |
| * this means that some slices may never be processed by this loop |
| * <p> |
| * When there is a slice failed on CL validation and there are remaining slices to check, the wait continues. |
| */ |
| public void waitForCompletion() |
| { |
| writeValidator.setPhase("WaitForCommitCompletion"); |
| |
| try |
| { |
| waitForCompletionInternal(); |
| } |
| finally |
| { |
| if (terminal.isDone()) |
| { |
| LOGGER.info("Concluded the safe termination, given the specified consistency level is satisfied " + |
| "and enough time has been blocked for importing slices."); |
| } |
| cassandraTopologyMonitor.shutdownNow(); |
| importFutures.keySet().forEach(f -> f.cancel(true)); |
| terminal.complete(null); |
| scheduler.shutdownNow(); // shutdown and do not wait for the termination; the job is completing |
| } |
| } |
| |
| private void waitForCompletionInternal() |
| { |
| prepareToPoll(); |
| |
| startPolling(); |
| |
| await(); |
| } |
| |
| private void prepareToPoll() |
| { |
| totalSlices = blobStreamResultList.stream().mapToInt(res -> res.createdRestoreSlices.size()).sum(); |
| blobStreamResultList |
| .stream() |
| .flatMap(res -> res.createdRestoreSlices |
| .stream() |
| .map(CreatedRestoreSlice::sliceRequestPayload)) |
| .mapToLong(slice -> { |
| // individual task should never return slice with 0-size bundle |
| long size = slice.compressedSizeOrZero(); |
| if (size == 0) |
| { |
| throw new IllegalStateException("Found invalid slice with 0 compressed size. " + |
| "slice: " + slice); |
| } |
| return size; |
| }) |
| .forEach(size -> { |
| minSliceSize = Math.min(minSliceSize, size); |
| maxSliceSize = Math.max(maxSliceSize, size); |
| }); |
| satisfiedSlices = new AtomicInteger(0); |
| waitStartNanos = System.nanoTime(); |
| } |
| |
| private void startPolling() |
| { |
| for (BlobStreamResult blobStreamResult : blobStreamResultList) |
| { |
| for (CreatedRestoreSlice createdRestoreSlice : blobStreamResult.createdRestoreSlices) |
| { |
| for (RingInstance instance : blobStreamResult.passed) |
| { |
| createSliceInstanceFuture(createdRestoreSlice, instance); |
| } |
| } |
| } |
| } |
| |
| private void addCompletionMonitor(CompletableFuture<?> future) |
| { |
| // whenComplete callback will still be invoked when the future is cancelled. |
| // In such case, expect CancellationException |
| future.whenComplete((v, t) -> { |
| LOGGER.info("Completed slice requests {}/{}", completedSlices.incrementAndGet(), importFutures.keySet().size()); |
| |
| if (t instanceof CancellationException) |
| { |
| RequestAndInstance rai = importFutures.get(future); |
| LOGGER.info("Cancelled import. instance={} slice={}", rai.nodeFqdn, rai.requestPayload); |
| return; |
| } |
| |
| // only enter the block once |
| if (satisfiedSlices.get() == totalSlices |
| && terminalScheduled.compareAndSet(false, true)) |
| { |
| long timeToAllSatisfiedNanos = System.nanoTime() - waitStartNanos; |
| long timeout = estimateTimeout(timeToAllSatisfiedNanos); |
| LOGGER.info("The specified consistency level of the job has been satisfied. " + |
| "Continuing to waiting on slices completion in order to prevent Cassandra side " + |
| "streaming as much as possible. The estimated additional wait time is {} seconds.", |
| TimeUnit.NANOSECONDS.toSeconds(timeout)); |
| // schedule to complete the terminal |
| scheduler.schedule(() -> terminal.complete(null), |
| timeout, TimeUnit.NANOSECONDS); |
| } |
| }); |
| } |
| |
| private void await() |
| { |
| // the result either fail early once firstFailure future completes exceptionally, reached timeout (while CL is satisfied), |
| // or the results list completes |
| CompletableFuture.anyOf(firstFailure, terminal, |
| CompletableFuture.allOf(importFutures.keySet().toArray(new CompletableFuture[0]))) |
| .join(); |
| // double check to make sure all slices are satisfied |
| // Because at this point all ranges have been either satisfied or the job has already failed, |
| // this is really just a sanity check for things like lost futures/future-introduced bugs |
| validateAllRangesAreSatisfied(); |
| } |
| |
| // calculate the timeout based on the 1) time taken to have all slices satisfied, and 2) use import rate |
| private long estimateTimeout(long timeToAllSatisfiedNanos) |
| { |
| long timeout = timeToAllSatisfiedNanos; |
| // use the minSliceSize to get the slowest import rate. R = minSliceSize / T |
| // use the maxSliceSize to get the highest amount of time needed for import. D = maxSliceSize / R |
| // Please do not combine the two statements below for readability purpose |
| double estimatedRateFloor = ((double) minSliceSize) / timeToAllSatisfiedNanos; |
| double timeEstimateBasedOnRate = ((double) maxSliceSize) / estimatedRateFloor; |
| timeout = Math.max((long) timeEstimateBasedOnRate, timeout); |
| timeout = job.importCoordinatorTimeoutMultiplier() * timeout; |
| if (TimeUnit.NANOSECONDS.toHours(timeout) > 1) |
| { |
| LOGGER.warn("The estimated additional timeout is more than 1 hour. timeout={} seconds", |
| TimeUnit.NANOSECONDS.toSeconds(timeout)); |
| } |
| return timeout; |
| } |
| |
| private void createSliceInstanceFuture(CreatedRestoreSlice createdRestoreSlice, |
| RingInstance instance) |
| { |
| if (firstFailure.isCompletedExceptionally()) |
| { |
| LOGGER.warn("The job has failed already. Skip sending import request. instance={} slice={}", |
| instance.nodeName(), createdRestoreSlice.sliceRequestPayload()); |
| return; |
| } |
| SidecarInstance sidecarInstance = toSidecarInstance(instance, job.effectiveSidecarPort()); |
| CreateSliceRequestPayload createSliceRequestPayload = createdRestoreSlice.sliceRequestPayload(); |
| CompletableFuture<Void> fut = dataTransferApi.createRestoreSliceFromDriver(sidecarInstance, |
| createSliceRequestPayload); |
| fut = fut.handleAsync((ignored, throwable) -> { |
| if (throwable == null) |
| { |
| handleSuccessfulSliceInstance(createdRestoreSlice, instance, createSliceRequestPayload); |
| } |
| else |
| { |
| // use handle API to swallow the throwable on purpose; the throwable is set to `firstFailure` |
| handleFailedSliceInstance(instance, createSliceRequestPayload, throwable); |
| } |
| return null; |
| }); |
| addCompletionMonitor(fut); |
| // Use the fut variable (, instead of the new future object from whenComplete) for key on purpose. |
| // So that whenComplete callback can receive CancellationException |
| importFutures.put(fut, new RequestAndInstance(createSliceRequestPayload, instance.nodeName())); |
| } |
| |
| private void handleFailedSliceInstance(RingInstance instance, |
| CreateSliceRequestPayload createSliceRequestPayload, |
| Throwable throwable) |
| { |
| LOGGER.warn("Import failed. instance={} slice={}", instance.nodeName(), createSliceRequestPayload, throwable); |
| |
| Range<BigInteger> range = Range.openClosed(createSliceRequestPayload.startToken(), |
| createSliceRequestPayload.endToken()); |
| writeValidator.updateFailureHandler(range, instance, "Failed to import slice. " + throwable.getMessage()); |
| // it either passes or throw if consistency level cannot be satisfied |
| try |
| { |
| writeValidator.validateClOrFail(cassandraTopologyMonitor.initialTopology()); |
| } |
| catch (RuntimeException rte) |
| { |
| // record the first failure and cancel queued futures. |
| firstFailure.completeExceptionally(rte); |
| } |
| } |
| |
| private void handleSuccessfulSliceInstance(CreatedRestoreSlice createdRestoreSlice, |
| RingInstance instance, |
| CreateSliceRequestPayload createSliceRequestPayload) |
| { |
| LOGGER.info("Import succeeded. instance={} slice={}", instance.nodeName(), createSliceRequestPayload); |
| createdRestoreSlice.addSucceededInstance(instance); |
| if (SATISFIED == |
| createdRestoreSlice.checkForConsistencyLevel(job.getConsistencyLevel(), |
| replicationFactor, |
| job.getLocalDC())) |
| { |
| satisfiedSlices.incrementAndGet(); |
| try |
| { |
| extension.onObjectApplied(createSliceRequestPayload.bucket(), |
| createSliceRequestPayload.key(), |
| createSliceRequestPayload.compressedSizeOrZero(), |
| System.nanoTime() - startTimeNanos); |
| } |
| catch (Throwable t) |
| { |
| // log a warning message and carry on |
| LOGGER.warn("StorageTransportExtension fails to process ObjectApplied notification", t); |
| } |
| } |
| } |
| |
| /** |
| * Validate that all ranges should collect enough write acknowledges to satisfy the consistency level |
| * It throws when there is any range w/o enough write acknowledges |
| */ |
| private void validateAllRangesAreSatisfied() |
| { |
| List<CreatedRestoreSlice> unsatisfiedSlices = new ArrayList<>(); |
| for (BlobStreamResult blobStreamResult : blobStreamResultList) |
| { |
| for (CreatedRestoreSlice createdRestoreSlice : blobStreamResult.createdRestoreSlices) |
| { |
| if (NOT_SATISFIED == createdRestoreSlice.checkForConsistencyLevel(job.getConsistencyLevel(), |
| replicationFactor, |
| job.getLocalDC())) |
| { |
| unsatisfiedSlices.add(createdRestoreSlice); |
| } |
| } |
| } |
| if (unsatisfiedSlices.isEmpty()) |
| { |
| LOGGER.info("All token ranges have satisfied with consistency level. consistencyLevel={} phase={}", |
| job.getConsistencyLevel(), writeValidator.getPhase()); |
| } |
| else |
| { |
| String message = String.format("Some of the token ranges cannot satisfy with consistency level. " + |
| "job=%s phase=%s consistencyLevel=%s ranges=%s", |
| job.getRestoreJobId(), writeValidator.getPhase(), job.getConsistencyLevel(), unsatisfiedSlices); |
| LOGGER.error(message); |
| throw new RuntimeException(message); |
| } |
| } |
| |
| @VisibleForTesting |
| Map<CompletableFuture<Void>, RequestAndInstance> importFutures() |
| { |
| return importFutures; |
| } |
| |
| @VisibleForTesting |
| CompletableFuture<Void> firstFailure() |
| { |
| return firstFailure; |
| } |
| |
| // simple data class to group the request and the node fqdn |
| static class RequestAndInstance |
| { |
| final String nodeFqdn; |
| final CreateSliceRequestPayload requestPayload; |
| |
| RequestAndInstance(CreateSliceRequestPayload requestPayload, String nodeFqdn) |
| { |
| this.nodeFqdn = nodeFqdn; |
| this.requestPayload = requestPayload; |
| } |
| } |
| } |