blob: b90b213e39b23b666f940493c98ab259657347fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.bulkwriter;
import java.math.BigInteger;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.spark.bulkwriter.util.ThreadUtil;
import org.jetbrains.annotations.Nullable;
public final class CommitCoordinator extends AbstractFuture<List<CommitResult>> implements AutoCloseable
{
private static final Logger LOGGER = LoggerFactory.getLogger(CommitCoordinator.class);
private final HashMap<RingInstance, ListeningExecutorService> executors = new HashMap<>();
private final List<DirectStreamResult> successfulUploads;
private final DirectDataTransferApi directDataTransferApi;
private final ClusterInfo cluster;
private final JobInfo job;
private ListenableFuture<List<CommitResult>> allCommits;
private final String jobSuffix;
public static CommitCoordinator commit(BulkWriterContext writerContext,
TransportContext.DirectDataBulkWriterContext
transportContext,
DirectStreamResult... uploadResults)
{
CommitCoordinator coordinator = new CommitCoordinator(writerContext.cluster(),
writerContext.job(),
transportContext.dataTransferApi(),
uploadResults);
coordinator.commit();
return coordinator;
}
private CommitCoordinator(ClusterInfo cluster, JobInfo job, DirectDataTransferApi dataTransferApi, DirectStreamResult[] uploadResults)
{
this.directDataTransferApi = dataTransferApi;
this.cluster = cluster;
this.job = job;
this.jobSuffix = "-" + job.getRestoreJobId();
successfulUploads = Arrays.stream(uploadResults)
.filter(result -> !result.passed.isEmpty())
.collect(Collectors.toList());
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
close();
return allCommits == null || allCommits.cancel(mayInterruptIfRunning);
}
void commit()
{
// We may have already committed - we should never get here if we did, but if somehow we do we should
// simply return the commit results we already collected
if (!successfulUploads.isEmpty() && successfulUploads.stream()
.allMatch(result -> result.commitResults != null
&& !result.commitResults.isEmpty()))
{
List<CommitResult> collect = successfulUploads.stream()
.flatMap(streamResult -> streamResult.commitResults.stream())
.collect(Collectors.toList());
set(collect);
return;
}
// First, group commits by instance so we can multi-commit
Map<RingInstance, Map<String, Range<BigInteger>>> resultsByInstance = getResultsByInstance(successfulUploads);
List<ListenableFuture<CommitResult>> commitFutures = resultsByInstance.entrySet().stream()
.flatMap(entry -> commit(executors, entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
// Create an aggregate ListenableFuture around the list of futures containing the results of the commit calls.
// We'll fail fast if any of those errMsg (note that an errMsg here means an unexpected exception,
// not a failure response from CassandraManager).
// The callback on the aggregate listener sets the return value for this AbstractFuture
// so callers can make blocking calls to CommitCoordinator::get.
allCommits = Futures.allAsList(commitFutures);
Futures.addCallback(allCommits,
new FutureCallback<List<CommitResult>>()
{
public void onSuccess(@Nullable List<CommitResult> result)
{
set(result);
}
public void onFailure(Throwable throwable)
{
setException(throwable);
}
},
Runnable::run);
}
private Stream<ListenableFuture<CommitResult>> commit(Map<RingInstance, ListeningExecutorService> executors,
RingInstance instance,
Map<String, Range<BigInteger>> uploadRanges)
{
ListeningExecutorService executorService =
executors.computeIfAbsent(instance,
inst -> MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(job.getCommitThreadsPerInstance(),
ThreadUtil.threadFactory("commit-sstable-" + inst.nodeName()))));
List<String> allUuids = new ArrayList<>(uploadRanges.keySet());
LOGGER.info("Committing UUIDs={}, Ranges={}, instance={}", allUuids, uploadRanges.values(), instance.nodeName());
List<List<String>> batches = Lists.partition(allUuids, job.getCommitBatchSize());
return batches.stream().map(uuids -> {
String migrationId = UUID.randomUUID().toString();
return executorService.submit(() -> {
CommitResult commitResult = new CommitResult(migrationId, instance, uploadRanges);
try
{
DirectDataTransferApi.RemoteCommitResult result = directDataTransferApi.commitSSTables(instance, migrationId, uuids);
if (result.isSuccess)
{
LOGGER.info("[{}]: Commit succeeded on {} for {}", migrationId, instance.nodeName(), uploadRanges);
}
else
{
LOGGER.error("[{}]: Commit failed: uploadRanges: {}, failedUuids: {}, stdErr: {}",
migrationId,
uploadRanges.entrySet(),
result.failedUuids,
result.stdErr);
if (!result.failedUuids.isEmpty())
{
addFailures(result.failedUuids, uploadRanges, commitResult, result.stdErr);
}
else
{
addFailures(uploadRanges, commitResult, result.stdErr);
}
}
}
catch (Throwable throwable)
{
addFailures(uploadRanges, commitResult, throwable.toString());
// On errMsg, refresh cluster information so we get the latest block list and status information to react accordingly
cluster.refreshClusterInfo();
}
return commitResult;
});
});
}
private void addFailures(List<String> failedRanges,
Map<String, Range<BigInteger>> uploadRanges,
CommitResult commitResult,
String error)
{
failedRanges.forEach(uuid -> {
String shortUuid = uuid.replace(jobSuffix, "");
commitResult.addFailedCommit(shortUuid, uploadRanges.get(shortUuid), error != null ? error : "Unknown Commit Failure");
});
}
private void addFailures(Map<String, Range<BigInteger>> failedRanges, CommitResult commitResult, String message)
{
failedRanges.forEach((key, value) -> commitResult.addFailedCommit(key, value, message));
LOGGER.debug("Added failures to commitResult by Range: {}", commitResult);
}
private Map<RingInstance, Map<String, Range<BigInteger>>> getResultsByInstance(List<DirectStreamResult> successfulUploads)
{
return successfulUploads
.stream()
.flatMap(upload -> upload.passed
.stream()
.map(instance -> new AbstractMap.SimpleEntry<>(instance,
new AbstractMap.SimpleEntry<>(upload.sessionID, upload.tokenRange))))
.collect(Collectors.groupingBy(AbstractMap.SimpleEntry::getKey,
Collectors.toMap(instance -> instance.getValue().getKey(),
instance -> instance.getValue().getValue())));
}
@Override
public void close()
{
executors.values().forEach(ExecutorService::shutdownNow);
}
}