blob: 7c8d6045834cbdfe4317976a4eb75d49096f70d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.raft;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Checksum;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.CRC64;
import com.google.protobuf.ByteString;
public class StoreSnapshotFile {
private static final Logger LOG = Log.logger(StoreSnapshotFile.class);
public static final String SNAPSHOT_DIR = "snapshot";
private static final String TAR = ".zip";
private final RaftBackendStore[] stores;
private final Map<String, String> dataDisks;
private final AtomicBoolean compressing;
public StoreSnapshotFile(RaftBackendStore[] stores) {
this.stores = stores;
this.dataDisks = new HashMap<>();
for (RaftBackendStore raftStore : stores) {
// Call RocksDBStore method reportDiskMapping()
this.dataDisks.putAll(Whitebox.invoke(raftStore, "store",
"reportDiskMapping"));
}
this.compressing = new AtomicBoolean(false);
/*
* Like that:
* general=/parent_path/rocksdb-data
* g/VERTEX=/parent_path/rocksdb-vertex
*/
LOG.debug("The store data disks mapping {}", this.dataDisks);
}
public void save(SnapshotWriter writer, Closure done,
ExecutorService executor) {
try {
// Write snapshot to real directory
Map<String, String> snapshotDirMaps = this.doSnapshotSave();
executor.execute(() -> {
if (!this.compressing.compareAndSet(false, true)) {
LOG.info("Last compress task doesn't finish, skipped it");
done.run(new Status(RaftError.EBUSY,
"Last compress task doesn't finish, skipped it"));
return;
}
try {
this.compressSnapshotDir(writer, snapshotDirMaps);
this.deleteSnapshotDirs(snapshotDirMaps.keySet());
done.run(Status.OK());
} catch (Throwable e) {
LOG.error("Failed to compress snapshot", e);
done.run(new Status(RaftError.EIO,
"Failed to compress snapshot, " +
"error is %s", e.getMessage()));
} finally {
this.compressing.compareAndSet(true, false);
}
});
} catch (Throwable e) {
LOG.error("Failed to save snapshot", e);
done.run(new Status(RaftError.EIO,
"Failed to save snapshot, error is %s", e.getMessage()));
}
}
public boolean load(SnapshotReader reader) {
Set<String> snapshotDirTars = reader.listFiles();
LOG.info("The snapshot tar files to be loaded are {}", snapshotDirTars);
Set<String> snapshotDirs = new HashSet<>();
if (!this.compressing.compareAndSet(false, true)) {
LOG.info("Last decompress task doesn't finish, skipped it");
return false;
}
try {
for (String snapshotDirTar : snapshotDirTars) {
String snapshotDir = this.decompressSnapshot(reader, snapshotDirTar);
snapshotDirs.add(snapshotDir);
}
} catch (Throwable e) {
LOG.error("Failed to decompress snapshot tar", e);
return false;
} finally {
this.compressing.compareAndSet(true, false);
}
try {
this.doSnapshotLoad();
this.deleteSnapshotDirs(snapshotDirs);
} catch (Throwable e) {
LOG.error("Failed to load snapshot", e);
return false;
}
return true;
}
private Map<String, String> doSnapshotSave() {
Map<String, String> snapshotDirMaps = InsertionOrderUtil.newMap();
for (RaftBackendStore store : this.stores) {
snapshotDirMaps.putAll(store.originStore().createSnapshot(SNAPSHOT_DIR));
}
LOG.info("Saved all snapshots: {}", snapshotDirMaps);
return snapshotDirMaps;
}
private void doSnapshotLoad() {
for (RaftBackendStore store : this.stores) {
store.originStore().resumeSnapshot(SNAPSHOT_DIR, false);
}
}
private void compressSnapshotDir(SnapshotWriter writer, Map<String, String> snapshotDirMaps) {
String writerPath = writer.getPath();
for (Map.Entry<String, String> entry : snapshotDirMaps.entrySet()) {
String snapshotDir = entry.getKey();
String diskTableKey = entry.getValue();
String snapshotDirTar = Paths.get(snapshotDir).getFileName().toString() + TAR;
String outputFile = Paths.get(writerPath, snapshotDirTar).toString();
Checksum checksum = new CRC64();
try {
LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
long begin = System.currentTimeMillis();
String rootDir = Paths.get(snapshotDir).toAbsolutePath().getParent().toString();
String sourceDir = Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault()
.compressZip(rootDir, sourceDir, outputFile, checksum);
long end = System.currentTimeMillis();
LOG.info("Compressed dir '{}' to '{}', took {} seconds",
snapshotDir, outputFile, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException("Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
}
LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
metaBuilder.setChecksum(Long.toHexString(checksum.getValue()));
/*
* snapshot_rocksdb-data.tar -> general
* snapshot_rocksdb-vertex.tar -> g/VERTEX
*/
metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey));
if (!writer.addFile(snapshotDirTar, metaBuilder.build())) {
throw new RaftException("Failed to add snapshot file: '%s'", snapshotDirTar);
}
}
}
private String decompressSnapshot(SnapshotReader reader,
String snapshotDirTar) throws IOException {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar);
if (meta == null) {
throw new IOException("Can't find snapshot archive file, path=" + snapshotDirTar);
}
String diskTableKey = meta.getUserMeta().toStringUtf8();
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
String parentPath = Paths.get(dataPath).toAbsolutePath().getParent().toString();
String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
LOG.info("Delete stale snapshot dir {}", snapshotDir);
Checksum checksum = new CRC64();
String archiveFile = Paths.get(reader.getPath(), snapshotDirTar).toString();
try {
LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
archiveFile, parentPath);
long begin = System.currentTimeMillis();
CompressStrategyManager.getDefault().decompressZip(archiveFile, parentPath, checksum);
long end = System.currentTimeMillis();
LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
archiveFile, parentPath, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException(
"Failed to decompress snapshot, zip=%s", e, archiveFile);
}
if (meta.hasChecksum()) {
String expected = meta.getChecksum();
String actual = Long.toHexString(checksum.getValue());
E.checkArgument(expected.equals(actual),
"Snapshot checksum error: '%s' != '%s'", actual, expected);
}
return snapshotDir;
}
private void deleteSnapshotDirs(Set<String> snapshotDirs) {
for (String snapshotDir : snapshotDirs) {
FileUtils.deleteQuietly(new File(snapshotDir));
}
}
}