| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.server.storage; |
| |
| import org.apache.ratis.io.CorruptedFileException; |
| import org.apache.ratis.io.MD5Hash; |
| import org.apache.ratis.proto.RaftProtos.FileChunkProto; |
| import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; |
| import org.apache.ratis.protocol.RaftPeerId; |
| import org.apache.ratis.server.util.ServerStringUtils; |
| import org.apache.ratis.statemachine.SnapshotInfo; |
| import org.apache.ratis.statemachine.StateMachine; |
| import org.apache.ratis.statemachine.StateMachineStorage; |
| import org.apache.ratis.util.FileUtils; |
| import org.apache.ratis.util.JavaUtils; |
| import org.apache.ratis.util.MD5FileUtil; |
| import org.apache.ratis.util.MemoizedSupplier; |
| import org.apache.ratis.util.Preconditions; |
| import org.apache.ratis.util.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.nio.file.Path; |
| import java.nio.file.StandardOpenOption; |
| import java.security.MessageDigest; |
| import java.util.Optional; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| |
| /** |
| * Manage snapshots of a raft peer. |
| * TODO: snapshot should be treated as compaction log thus can be merged into |
| * RaftLog. In this way we can have a unified getLastTermIndex interface. |
| */ |
| public class SnapshotManager { |
| private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); |
| |
| private static final String CORRUPT = ".corrupt"; |
| private static final String TMP = ".tmp"; |
| |
| private final RaftPeerId selfId; |
| |
| private final Supplier<File> snapshotDir; |
| private final Supplier<File> snapshotTmpDir; |
| private final Function<FileChunkProto, String> getRelativePath; |
| private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester); |
| |
| SnapshotManager(RaftPeerId selfId, Supplier<RaftStorageDirectory> dir, StateMachineStorage smStorage) { |
| this.selfId = selfId; |
| this.snapshotDir = MemoizedSupplier.valueOf( |
| () -> Optional.ofNullable(smStorage.getSnapshotDir()).orElseGet(() -> dir.get().getStateMachineDir())); |
| this.snapshotTmpDir = MemoizedSupplier.valueOf( |
| () -> Optional.ofNullable(smStorage.getTmpDir()).orElseGet(() -> dir.get().getTmpDir())); |
| |
| final Supplier<Path> smDir = MemoizedSupplier.valueOf(() -> dir.get().getStateMachineDir().toPath()); |
| this.getRelativePath = c -> smDir.get().relativize( |
| new File(dir.get().getRoot(), c.getFilename()).toPath()).toString(); |
| } |
| |
| @SuppressWarnings({"squid:S2095"}) // Suppress closeable warning |
| private FileChannel open(FileChunkProto chunk, File tmpSnapshotFile) throws IOException { |
| final FileChannel out; |
| final boolean exists = tmpSnapshotFile.exists(); |
| if (chunk.getOffset() == 0) { |
| // if offset is 0, delete any existing temp snapshot file if it has the same last index. |
| if (exists) { |
| FileUtils.deleteFully(tmpSnapshotFile); |
| } |
| // create the temp snapshot file and put padding inside |
| out = FileUtils.newFileChannel(tmpSnapshotFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE); |
| digester.get().reset(); |
| } else { |
| if (!exists) { |
| throw new FileNotFoundException("Chunk offset is non-zero but file is not found: " + tmpSnapshotFile |
| + ", chunk=" + chunk); |
| } |
| out = FileUtils.newFileChannel(tmpSnapshotFile, StandardOpenOption.WRITE) |
| .position(chunk.getOffset()); |
| } |
| return out; |
| } |
| |
| public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException { |
| final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); |
| final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); |
| |
| // create a unique temporary directory |
| final File tmpDir = new File(this.snapshotTmpDir.get(), "snapshot-" + snapshotChunkRequest.getRequestId()); |
| FileUtils.createDirectories(tmpDir); |
| tmpDir.deleteOnExit(); |
| |
| LOG.info("Installing snapshot:{}, to tmp dir:{}", |
| ServerStringUtils.toInstallSnapshotRequestString(request), tmpDir); |
| |
| // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order, |
| // and are not lost when whole request cycle is done. Check requestId and requestIndex here |
| |
| for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) { |
| SnapshotInfo pi = stateMachine.getLatestSnapshot(); |
| if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) { |
| throw new IOException("There exists snapshot file " |
| + pi.getFiles() + " in " + selfId |
| + " with endIndex >= lastIncludedIndex " + lastIncludedIndex); |
| } |
| |
| final File tmpSnapshotFile = new File(tmpDir, getRelativePath.apply(chunk)); |
| FileUtils.createDirectoriesDeleteExistingNonDirectory(tmpSnapshotFile.getParentFile()); |
| |
| try (FileChannel out = open(chunk, tmpSnapshotFile)) { |
| final ByteBuffer data = chunk.getData().asReadOnlyByteBuffer(); |
| digester.get().update(data.duplicate()); |
| |
| int written = 0; |
| for(; data.remaining() > 0; ) { |
| written += out.write(data); |
| } |
| Preconditions.assertSame(chunk.getData().size(), written, "written"); |
| } |
| |
| // rename the temp snapshot file if this is the last chunk. also verify |
| // the md5 digest and create the md5 meta-file. |
| if (chunk.getDone()) { |
| final MD5Hash expectedDigest = |
| new MD5Hash(chunk.getFileDigest().toByteArray()); |
| // calculate the checksum of the snapshot file and compare it with the |
| // file digest in the request |
| final MD5Hash digest = new MD5Hash(digester.get().digest()); |
| if (!digest.equals(expectedDigest)) { |
| LOG.warn("The snapshot md5 digest {} does not match expected {}", |
| digest, expectedDigest); |
| // rename the temp snapshot file to .corrupt |
| String renameMessage; |
| try { |
| final File corruptedFile = FileUtils.move(tmpSnapshotFile, CORRUPT + StringUtils.currentDateTime()); |
| renameMessage = "Renamed temporary snapshot file " + tmpSnapshotFile + " to " + corruptedFile; |
| } catch (IOException e) { |
| renameMessage = "Tried but failed to rename temporary snapshot file " + tmpSnapshotFile |
| + " to a " + CORRUPT + " file"; |
| LOG.warn(renameMessage, e); |
| renameMessage += ": " + e; |
| } |
| throw new CorruptedFileException(tmpSnapshotFile, |
| "MD5 mismatch for snapshot-" + lastIncludedIndex + " installation. " + renameMessage); |
| } else { |
| MD5FileUtil.saveMD5File(tmpSnapshotFile, digest); |
| } |
| } |
| } |
| |
| if (snapshotChunkRequest.getDone()) { |
| rename(tmpDir, snapshotDir.get()); |
| } |
| } |
| |
| private static void rename(File tmpDir, File stateMachineDir) throws IOException { |
| LOG.info("Installed snapshot, renaming temporary dir {} to {}", tmpDir, stateMachineDir); |
| |
| // rename stateMachineDir to tmp, if it exists. |
| final File existingDir; |
| if (stateMachineDir.exists()) { |
| File moved = null; |
| try { |
| moved = FileUtils.move(stateMachineDir, TMP + StringUtils.currentDateTime()); |
| } catch(IOException e) { |
| LOG.warn("Failed to rename state machine directory " + stateMachineDir.getAbsolutePath() |
| + " to a " + TMP + " directory. Try deleting it directly.", e); |
| FileUtils.deleteFully(stateMachineDir); |
| } |
| existingDir = moved; |
| } else { |
| existingDir = null; |
| } |
| |
| // rename tmpDir to stateMachineDir |
| try { |
| FileUtils.move(tmpDir, stateMachineDir); |
| } catch (IOException e) { |
| throw new IOException("Failed to rename temporary director " + tmpDir.getAbsolutePath() |
| + " to " + stateMachineDir.getAbsolutePath(), e); |
| } |
| |
| // delete existing dir |
| if (existingDir != null) { |
| try { |
| FileUtils.deleteFully(existingDir); |
| } catch (IOException e) { |
| LOG.warn("Failed to delete existing directory " + existingDir.getAbsolutePath(), e); |
| } |
| } |
| } |
| } |