blob: 6ccf2000a5520f8162d7df944306f39457013325 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static java.util.Optional.ofNullable;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectory;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
/** */
public class SnapshotResponseRemoteFutureTask extends AbstractSnapshotFutureTask<Void> {
/** Snapshot directory path. */
private final String snpPath;
/**
* @param cctx Shared context.
* @param srcNodeId Node id which cause snapshot task creation.
* @param snpName Unique identifier of snapshot process.
* @param snpPath Snapshot directory path.
* @param tmpWorkDir Working directory for intermediate snapshot results.
* @param ioFactory Factory to working with snapshot files.
* @param snpSndr Factory which produces snapshot receiver instance.
*/
public SnapshotResponseRemoteFutureTask(
GridCacheSharedContext<?, ?> cctx,
UUID srcNodeId,
String snpName,
String snpPath,
File tmpWorkDir,
FileIOFactory ioFactory,
SnapshotSender snpSndr,
Map<Integer, Set<Integer>> parts
) {
super(cctx, srcNodeId, snpName, tmpWorkDir, ioFactory, snpSndr, parts);
this.snpPath = snpPath;
}
/** {@inheritDoc} */
@Override public boolean start() {
if (F.isEmpty(parts))
return false;
try {
List<GroupPartitionId> handled = new ArrayList<>();
for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
ofNullable(e.getValue()).orElse(Collections.emptySet())
.forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
}
snpSndr.init(handled.size());
File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, snpPath);
List<CompletableFuture<Void>> futs = new ArrayList<>();
List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
for (SnapshotMetadata meta : metas) {
Map<Integer, Set<Integer>> parts0 = meta.partitions();
if (F.isEmpty(parts0))
continue;
handled.removeIf(gp -> {
if (ofNullable(parts0.get(gp.getGroupId()))
.orElse(Collections.emptySet())
.contains(gp.getPartitionId())
) {
futs.add(CompletableFuture.runAsync(() -> {
if (err.get() != null)
return;
File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
gp.getGroupId());
if (cacheDir == null) {
throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
", pair=" + gp + ']');
}
File snpPart = getPartitionFile(cacheDir.getParentFile(), cacheDir.getName(), gp.getPartitionId());
if (!snpPart.exists()) {
throw new IgniteException("Snapshot partition file not found [cacheDir=" + cacheDir +
", pair=" + gp + ']');
}
snpSndr.sendPart(snpPart, cacheDir.getName(), gp, snpPart.length());
}, snpSndr.executor())
.whenComplete((r, t) -> err.compareAndSet(null, t)));
return true;
}
return false;
});
}
if (!handled.isEmpty()) {
err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node [snpName=" + snpName +
", missed=" + handled + ']'));
}
int size = futs.size();
CompletableFuture.allOf(futs.toArray(new CompletableFuture[size]))
.whenComplete((r, t) -> {
Throwable th = ofNullable(err.get()).orElse(t);
if (th == null && log.isInfoEnabled()) {
log.info("Snapshot partitions have been sent to the remote node [snpName=" + snpName +
", rmtNodeId=" + srcNodeId + ']');
}
close(th);
});
return true;
}
catch (Throwable t) {
if (err.compareAndSet(null, t))
close(t);
return false;
}
}
/** {@inheritDoc} */
@Override public void acceptException(Throwable th) {
if (err.compareAndSet(null, th))
close(th);
}
/**
* @param th Additional close exception if occurred.
*/
private void close(@Nullable Throwable th) {
if (th == null) {
snpSndr.close(null);
onDone((Void)null);
}
else {
snpSndr.close(th);
onDone(th);
}
}
}