blob: e22841eec8a49634a16b22d5f00cf4bfc76c1ce6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutionException;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.NO_SUCH_ALGORITHM;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
/**
* This class is for performing chunk related operations.
*/
public class ChunkManagerImpl implements ChunkManager {
static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
private final boolean doSyncWrite;
public ChunkManagerImpl(boolean sync) {
doSyncWrite = sync;
}
/**
* writes a given chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block
* @param info - ChunkInfo
* @param data - data of the chunk
* @param dispatcherContext - dispatcherContextInfo
* @throws StorageContainerException
*/
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ByteBuffer data, DispatcherContext dispatcherContext)
throws StorageContainerException {
Preconditions.checkNotNull(dispatcherContext);
DispatcherContext.WriteChunkStage stage = dispatcherContext.getStage();
try {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
HddsVolume volume = containerData.getVolume();
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
boolean isOverwrite = ChunkUtils.validateChunkForOverwrite(
chunkFile, info);
File tmpChunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
if (LOG.isDebugEnabled()) {
LOG.debug(
"writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file:{}",
info.getChunkName(), stage, chunkFile, tmpChunkFile);
}
switch (stage) {
case WRITE_DATA:
if (isOverwrite) {
// if the actual chunk file already exists here while writing the temp
// chunk file, then it means the same ozone client request has
// generated two raft log entries. This can happen either because
// retryCache expired in Ratis (or log index mismatch/corruption in
// Ratis). This can be solved by two approaches as of now:
// 1. Read the complete data in the actual chunk file ,
// verify the data integrity and in case it mismatches , either
// 2. Delete the chunk File and write the chunk again. For now,
// let's rewrite the chunk file
// TODO: once the checksum support for write chunks gets plugged in,
// the checksum needs to be verified for the actual chunk file and
// the data to be written here which should be efficient and
// it matches we can safely return without rewriting.
LOG.warn("ChunkFile already exists" + chunkFile + ".Deleting it.");
FileUtil.fullyDelete(chunkFile);
}
if (tmpChunkFile.exists()) {
// If the tmp chunk file already exists it means the raft log got
// appended, but later on the log entry got truncated in Ratis leaving
// behind garbage.
// TODO: once the checksum support for data chunks gets plugged in,
// instead of rewriting the chunk here, let's compare the checkSums
LOG.warn(
"tmpChunkFile already exists" + tmpChunkFile + "Overwriting it.");
}
// Initially writes to temporary chunk file.
ChunkUtils
.writeData(tmpChunkFile, info, data, volumeIOStats, doSyncWrite);
// No need to increment container stats here, as still data is not
// committed here.
break;
case COMMIT_DATA:
// commit the data, means move chunk data from temporary chunk file
// to actual chunk file.
if (isOverwrite) {
// if the actual chunk file already exists , it implies the write
// chunk transaction in the containerStateMachine is getting
// reapplied. This can happen when a node restarts.
// TODO: verify the checkSums for the existing chunkFile and the
// chunkInfo to be committed here
LOG.warn("ChunkFile already exists" + chunkFile);
return;
}
// While committing a chunk , just rename the tmp chunk file which has
// the same term and log index appended as the current transaction
commitChunk(tmpChunkFile, chunkFile);
// Increment container stats here, as we commit the data.
updateContainerWriteStats(container, info, isOverwrite);
break;
case COMBINED:
// directly write to the chunk file
ChunkUtils.writeData(chunkFile, info, data, volumeIOStats, doSyncWrite);
updateContainerWriteStats(container, info, isOverwrite);
break;
default:
throw new IOException("Can not identify write operation.");
}
} catch (StorageContainerException ex) {
throw ex;
} catch (NoSuchAlgorithmException ex) {
LOG.error("write data failed. error: {}", ex);
throw new StorageContainerException("Internal error: ", ex,
NO_SUCH_ALGORITHM);
} catch (ExecutionException | IOException ex) {
LOG.error("write data failed. error: {}", ex);
throw new StorageContainerException("Internal error: ", ex,
CONTAINER_INTERNAL_ERROR);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("write data failed. error: {}", e);
throw new StorageContainerException("Internal error: ", e,
CONTAINER_INTERNAL_ERROR);
}
}
protected void updateContainerWriteStats(Container container, ChunkInfo info,
boolean isOverwrite) {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
if (!isOverwrite) {
containerData.incrBytesUsed(info.getLen());
}
containerData.incrWriteCount();
containerData.incrWriteBytes(info.getLen());
}
/**
* reads the data defined by a chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block.
* @param info - ChunkInfo.
* @param dispatcherContext dispatcher context info.
* @return byte array
* @throws StorageContainerException
* TODO: Right now we do not support partial reads and writes of chunks.
* TODO: Explore if we need to do that for ozone.
*/
public ByteBuffer readChunk(Container container, BlockID blockID,
ChunkInfo info, DispatcherContext dispatcherContext)
throws StorageContainerException {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
ByteBuffer data;
HddsVolume volume = containerData.getVolume();
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
// Checking here, which layout version the container is, and reading
// the chunk file in that format.
// In version1, we verify checksum if it is available and return data
// of the chunk file.
if (containerData.getLayOutVersion() == ChunkLayOutVersion
.getLatestVersion().getVersion()) {
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
// In case the chunk file does not exist but tmp chunk file exist,
// read from tmp chunk file if readFromTmpFile is set to true
if (!chunkFile.exists() && dispatcherContext != null
&& dispatcherContext.isReadFromTmpFile()) {
chunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
}
data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
containerData.incrReadCount();
long length = chunkFile.length();
containerData.incrReadBytes(length);
return data;
}
return null;
}
/**
* Deletes a given chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block
* @param info - Chunk Info
* @throws StorageContainerException
*/
public void deleteChunk(Container container, BlockID blockID, ChunkInfo info)
throws StorageContainerException {
Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
// Checking here, which layout version the container is, and performing
// deleting chunk operation.
// In version1, we have only chunk file.
if (containerData.getLayOutVersion() == ChunkLayOutVersion
.getLatestVersion().getVersion()) {
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
// if the chunk file does not exist, it might have already been deleted.
// The call might be because of reapply of transactions on datanode
// restart.
if (!chunkFile.exists()) {
LOG.warn("Chunk file doe not exist. chunk info :" + info.toString());
return;
}
if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
FileUtil.fullyDelete(chunkFile);
containerData.decrBytesUsed(chunkFile.length());
} else {
LOG.error("Not Supported Operation. Trying to delete a " +
"chunk that is in shared file. chunk info : " + info.toString());
throw new StorageContainerException("Not Supported Operation. " +
"Trying to delete a chunk that is in shared file. chunk info : "
+ info.toString(), UNSUPPORTED_REQUEST);
}
}
}
/**
* Shutdown the chunkManager.
*
* In the chunkManager we haven't acquired any resources, so nothing to do
* here.
*/
public void shutdown() {
//TODO: need to revisit this during integration of container IO.
}
/**
* Returns the temporary chunkFile path.
* @param chunkFile chunkFileName
* @param dispatcherContext dispatcher context info
* @return temporary chunkFile path
* @throws StorageContainerException
*/
private File getTmpChunkFile(File chunkFile,
DispatcherContext dispatcherContext) {
return new File(chunkFile.getParent(),
chunkFile.getName() +
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX +
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
dispatcherContext.getTerm() +
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
dispatcherContext.getLogIndex());
}
/**
* Commit the chunk by renaming the temporary chunk file to chunk file.
* @param tmpChunkFile
* @param chunkFile
* @throws IOException
*/
private void commitChunk(File tmpChunkFile, File chunkFile) throws
IOException {
Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
}
}