| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.container.keyvalue; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.Function; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.StorageUnit; |
| import org.apache.hadoop.hdds.client.BlockID; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .ContainerDataProto.State; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .ContainerCommandRequestProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .ContainerCommandResponseProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .ContainerType; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .GetSmallFileRequestProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos |
| .PutSmallFileRequestProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; |
| import org.apache.hadoop.hdds.scm.ByteStringConversion; |
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
| import org.apache.hadoop.hdds.scm.container.common.helpers |
| .StorageContainerException; |
| import org.apache.hadoop.ozone.OzoneConfigKeys; |
| import org.apache.hadoop.ozone.container.common.helpers.BlockData; |
| import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; |
| import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; |
| import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; |
| import org.apache.hadoop.ozone.container.common.impl.ContainerSet; |
| import org.apache.hadoop.ozone.container.common.interfaces.Container; |
| import org.apache.hadoop.ozone.container.common.interfaces.Handler; |
| import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; |
| import org.apache.hadoop.ozone.container.common.statemachine.StateContext; |
| import org.apache.hadoop.ozone.container.common.transport.server.ratis |
| .DispatcherContext; |
| import org.apache.hadoop.ozone.container.common.transport.server.ratis |
| .DispatcherContext.WriteChunkStage; |
| import org.apache.hadoop.ozone.container.common.volume.HddsVolume; |
| import org.apache.hadoop.ozone.container.common.volume |
| .RoundRobinVolumeChoosingPolicy; |
| import org.apache.hadoop.ozone.container.common.volume.VolumeSet; |
| import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; |
| import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; |
| import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; |
| import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils; |
| import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory; |
| import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; |
| import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; |
| import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; |
| import org.apache.hadoop.util.AutoCloseableLock; |
| import org.apache.hadoop.util.ReflectionUtils; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; |
| import static org.apache.hadoop.hdds.HddsConfigKeys |
| .HDDS_DATANODE_VOLUME_CHOOSING_POLICY; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. |
| Result.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Handler for KeyValue Container type. |
| */ |
| public class KeyValueHandler extends Handler { |
| |
| private static final Logger LOG = LoggerFactory.getLogger( |
| KeyValueHandler.class); |
| |
| private final ContainerType containerType; |
| private final BlockManager blockManager; |
| private final ChunkManager chunkManager; |
| private final VolumeChoosingPolicy volumeChoosingPolicy; |
| private final long maxContainerSize; |
| private final Function<ByteBuffer, ByteString> byteBufferToByteString; |
| |
| // A lock that is held during container creation. |
| private final AutoCloseableLock containerCreationLock; |
| private final boolean doSyncWrite; |
| |
| public KeyValueHandler(Configuration config, StateContext context, |
| ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) { |
| super(config, context, contSet, volSet, metrics); |
| containerType = ContainerType.KeyValueContainer; |
| blockManager = new BlockManagerImpl(config); |
| doSyncWrite = |
| conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY, |
| OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT); |
| chunkManager = ChunkManagerFactory.getChunkManager(config, doSyncWrite); |
| volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass( |
| HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy |
| .class, VolumeChoosingPolicy.class), conf); |
| maxContainerSize = (long)config.getStorageSize( |
| ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, |
| ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); |
| // this handler lock is used for synchronizing createContainer Requests, |
| // so using a fair lock here. |
| containerCreationLock = new AutoCloseableLock(new ReentrantLock(true)); |
| byteBufferToByteString = |
| ByteStringConversion.createByteBufferConversion(conf); |
| } |
| |
| @VisibleForTesting |
| public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() { |
| return volumeChoosingPolicy; |
| } |
| |
| @Override |
| public void stop() { |
| } |
| |
| @Override |
| public ContainerCommandResponseProto handle( |
| ContainerCommandRequestProto request, Container container, |
| DispatcherContext dispatcherContext) { |
| |
| Type cmdType = request.getCmdType(); |
| KeyValueContainer kvContainer = (KeyValueContainer) container; |
| switch(cmdType) { |
| case CreateContainer: |
| return handleCreateContainer(request, kvContainer); |
| case ReadContainer: |
| return handleReadContainer(request, kvContainer); |
| case UpdateContainer: |
| return handleUpdateContainer(request, kvContainer); |
| case DeleteContainer: |
| return handleDeleteContainer(request, kvContainer); |
| case ListContainer: |
| return handleUnsupportedOp(request); |
| case CloseContainer: |
| return handleCloseContainer(request, kvContainer); |
| case PutBlock: |
| return handlePutBlock(request, kvContainer, dispatcherContext); |
| case GetBlock: |
| return handleGetBlock(request, kvContainer); |
| case DeleteBlock: |
| return handleDeleteBlock(request, kvContainer); |
| case ListBlock: |
| return handleUnsupportedOp(request); |
| case ReadChunk: |
| return handleReadChunk(request, kvContainer, dispatcherContext); |
| case DeleteChunk: |
| return handleDeleteChunk(request, kvContainer); |
| case WriteChunk: |
| return handleWriteChunk(request, kvContainer, dispatcherContext); |
| case ListChunk: |
| return handleUnsupportedOp(request); |
| case CompactChunk: |
| return handleUnsupportedOp(request); |
| case PutSmallFile: |
| return handlePutSmallFile(request, kvContainer, dispatcherContext); |
| case GetSmallFile: |
| return handleGetSmallFile(request, kvContainer); |
| case GetCommittedBlockLength: |
| return handleGetCommittedBlockLength(request, kvContainer); |
| default: |
| return null; |
| } |
| } |
| |
| @VisibleForTesting |
| public ChunkManager getChunkManager() { |
| return this.chunkManager; |
| } |
| |
| @VisibleForTesting |
| public BlockManager getBlockManager() { |
| return this.blockManager; |
| } |
| |
| /** |
| * Handles Create Container Request. If successful, adds the container to |
| * ContainerSet and sends an ICR to the SCM. |
| */ |
| ContainerCommandResponseProto handleCreateContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| if (!request.hasCreateContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Create Container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| // Create Container request should be passed a null container as the |
| // container would be created here. |
| Preconditions.checkArgument(kvContainer == null); |
| |
| long containerID = request.getContainerID(); |
| |
| KeyValueContainerData newContainerData = new KeyValueContainerData( |
| containerID, maxContainerSize, request.getPipelineID(), |
| getDatanodeDetails().getUuidString()); |
| // TODO: Add support to add metadataList to ContainerData. Add metadata |
| // to container during creation. |
| KeyValueContainer newContainer = new KeyValueContainer( |
| newContainerData, conf); |
| |
| boolean created = false; |
| try (AutoCloseableLock l = containerCreationLock.acquire()) { |
| if (containerSet.getContainer(containerID) == null) { |
| newContainer.create(volumeSet, volumeChoosingPolicy, scmID); |
| created = containerSet.addContainer(newContainer); |
| } else { |
| // The create container request for an already existing container can |
| // arrive in case the ContainerStateMachine reapplies the transaction |
| // on datanode restart. Just log a warning msg here. |
| LOG.debug("Container already exists." + |
| "container Id " + containerID); |
| } |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } |
| |
| if (created) { |
| try { |
| sendICR(newContainer); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } |
| } |
| return ContainerUtils.getSuccessResponse(request); |
| } |
| |
| public void populateContainerPathFields(KeyValueContainer container, |
| long maxSize) throws IOException { |
| volumeSet.readLock(); |
| try { |
| HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet |
| .getVolumesList(), maxSize); |
| String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); |
| container.populatePathFields(scmID, containerVolume, hddsVolumeDir); |
| } finally { |
| volumeSet.readUnlock(); |
| } |
| } |
| |
| /** |
| * Handles Read Container Request. Returns the ContainerData as response. |
| */ |
| ContainerCommandResponseProto handleReadContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| if (!request.hasReadContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Read Container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| KeyValueContainerData containerData = kvContainer.getContainerData(); |
| return KeyValueContainerUtil.getReadContainerResponse( |
| request, containerData); |
| } |
| |
| |
| /** |
| * Handles Update Container Request. If successful, the container metadata |
| * is updated. |
| */ |
| ContainerCommandResponseProto handleUpdateContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasUpdateContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Update Container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| boolean forceUpdate = request.getUpdateContainer().getForceUpdate(); |
| List<KeyValue> keyValueList = |
| request.getUpdateContainer().getMetadataList(); |
| Map<String, String> metadata = new HashMap<>(); |
| for (KeyValue keyValue : keyValueList) { |
| metadata.put(keyValue.getKey(), keyValue.getValue()); |
| } |
| |
| try { |
| if (!metadata.isEmpty()) { |
| kvContainer.update(metadata, forceUpdate); |
| } |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } |
| return ContainerUtils.getSuccessResponse(request); |
| } |
| |
| /** |
| * Handles Delete Container Request. |
| * Open containers cannot be deleted. |
| * Holds writeLock on ContainerSet till the container is removed from |
| * containerMap. On disk deletion of container files will happen |
| * asynchronously without the lock. |
| */ |
| ContainerCommandResponseProto handleDeleteContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasDeleteContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Delete container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| boolean forceDelete = request.getDeleteContainer().getForceDelete(); |
| try { |
| deleteInternal(kvContainer, forceDelete); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } |
| return ContainerUtils.getSuccessResponse(request); |
| } |
| |
| /** |
| * Handles Close Container Request. An open container is closed. |
| * Close Container call is idempotent. |
| */ |
| ContainerCommandResponseProto handleCloseContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasCloseContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Update Container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| try { |
| markContainerForClose(kvContainer); |
| closeContainer(kvContainer); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Close Container failed", ex, |
| IO_EXCEPTION), request); |
| } |
| |
| return ContainerUtils.getSuccessResponse(request); |
| } |
| |
| /** |
| * Handle Put Block operation. Calls BlockManager to process the request. |
| */ |
| ContainerCommandResponseProto handlePutBlock( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| |
| long blockLength; |
| if (!request.hasPutBlock()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Put Key request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| BlockData blockData; |
| try { |
| checkContainerOpen(kvContainer); |
| |
| blockData = BlockData.getFromProtoBuf( |
| request.getPutBlock().getBlockData()); |
| Preconditions.checkNotNull(blockData); |
| long bcsId = |
| dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); |
| blockData.setBlockCommitSequenceId(bcsId); |
| long numBytes = blockData.getProtoBufMessage().toByteArray().length; |
| blockManager.putBlock(kvContainer, blockData); |
| metrics.incContainerBytesStats(Type.PutBlock, numBytes); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Put Key failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| return BlockUtils.putBlockResponseSuccess(request, blockData); |
| } |
| |
| /** |
| * Handle Get Block operation. Calls BlockManager to process the request. |
| */ |
| ContainerCommandResponseProto handleGetBlock( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasGetBlock()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Get Key request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| BlockData responseData; |
| try { |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getGetBlock().getBlockID()); |
| responseData = blockManager.getBlock(kvContainer, blockID); |
| long numBytes = responseData.getProtoBufMessage().toByteArray().length; |
| metrics.incContainerBytesStats(Type.GetBlock, numBytes); |
| |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Get Key failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| return BlockUtils.getBlockDataResponse(request, responseData); |
| } |
| |
| /** |
| * Handles GetCommittedBlockLength operation. |
| * Calls BlockManager to process the request. |
| */ |
| ContainerCommandResponseProto handleGetCommittedBlockLength( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| if (!request.hasGetCommittedBlockLength()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Get Key request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| long blockLength; |
| try { |
| BlockID blockID = BlockID |
| .getFromProtobuf(request.getGetCommittedBlockLength().getBlockID()); |
| blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("GetCommittedBlockLength failed", ex, |
| IO_EXCEPTION), request); |
| } |
| |
| return BlockUtils.getBlockLengthResponse(request, blockLength); |
| } |
| |
| /** |
| * Handle Delete Block operation. Calls BlockManager to process the request. |
| */ |
| ContainerCommandResponseProto handleDeleteBlock( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasDeleteBlock()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Delete Key request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| try { |
| checkContainerOpen(kvContainer); |
| |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getDeleteBlock().getBlockID()); |
| |
| blockManager.deleteBlock(kvContainer, blockID); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Delete Key failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| return BlockUtils.getBlockResponseSuccess(request); |
| } |
| |
| /** |
| * Handle Read Chunk operation. Calls ChunkManager to process the request. |
| */ |
| ContainerCommandResponseProto handleReadChunk( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| |
| if (!request.hasReadChunk()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Read Chunk request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout if that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| ChunkInfo chunkInfo; |
| ByteBuffer data; |
| try { |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getReadChunk().getBlockID()); |
| chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk() |
| .getChunkData()); |
| Preconditions.checkNotNull(chunkInfo); |
| |
| if (dispatcherContext == null) { |
| dispatcherContext = new DispatcherContext.Builder().build(); |
| } |
| |
| data = chunkManager |
| .readChunk(kvContainer, blockID, chunkInfo, dispatcherContext); |
| metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen()); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Read Chunk failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| Preconditions.checkNotNull(data, "Chunk data is null"); |
| |
| ContainerProtos.ReadChunkResponseProto.Builder response = |
| ContainerProtos.ReadChunkResponseProto.newBuilder(); |
| response.setChunkData(chunkInfo.getProtoBufMessage()); |
| response.setData(byteBufferToByteString.apply(data)); |
| response.setBlockID(request.getReadChunk().getBlockID()); |
| |
| ContainerCommandResponseProto.Builder builder = |
| ContainerUtils.getSuccessResponseBuilder(request); |
| builder.setReadChunk(response); |
| return builder.build(); |
| } |
| |
| /** |
| * Throw an exception if the container is unhealthy. |
| * |
| * @throws StorageContainerException if the container is unhealthy. |
| * @param kvContainer |
| */ |
| @VisibleForTesting |
| void checkContainerIsHealthy(KeyValueContainer kvContainer) |
| throws StorageContainerException { |
| kvContainer.readLock(); |
| try { |
| if (kvContainer.getContainerData().getState() == State.UNHEALTHY) { |
| throw new StorageContainerException( |
| "The container replica is unhealthy.", |
| CONTAINER_UNHEALTHY); |
| } |
| } finally { |
| kvContainer.readUnlock(); |
| } |
| } |
| |
| /** |
| * Handle Delete Chunk operation. Calls ChunkManager to process the request. |
| */ |
| ContainerCommandResponseProto handleDeleteChunk( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasDeleteChunk()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Delete Chunk request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| try { |
| checkContainerOpen(kvContainer); |
| |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getDeleteChunk().getBlockID()); |
| ContainerProtos.ChunkInfo chunkInfoProto = request.getDeleteChunk() |
| .getChunkData(); |
| ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); |
| Preconditions.checkNotNull(chunkInfo); |
| |
| chunkManager.deleteChunk(kvContainer, blockID, chunkInfo); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Delete Chunk failed", ex, |
| IO_EXCEPTION), request); |
| } |
| |
| return ChunkUtils.getChunkResponseSuccess(request); |
| } |
| |
| /** |
| * Handle Write Chunk operation. Calls ChunkManager to process the request. |
| */ |
| ContainerCommandResponseProto handleWriteChunk( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| |
| if (!request.hasWriteChunk()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Write Chunk request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| try { |
| checkContainerOpen(kvContainer); |
| |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getWriteChunk().getBlockID()); |
| ContainerProtos.ChunkInfo chunkInfoProto = |
| request.getWriteChunk().getChunkData(); |
| ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); |
| Preconditions.checkNotNull(chunkInfo); |
| |
| ByteBuffer data = null; |
| if (dispatcherContext == null) { |
| dispatcherContext = new DispatcherContext.Builder().build(); |
| } |
| WriteChunkStage stage = dispatcherContext.getStage(); |
| if (stage == WriteChunkStage.WRITE_DATA || |
| stage == WriteChunkStage.COMBINED) { |
| data = request.getWriteChunk().getData().asReadOnlyByteBuffer(); |
| } |
| |
| chunkManager |
| .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); |
| |
| // We should increment stats after writeChunk |
| if (stage == WriteChunkStage.WRITE_DATA|| |
| stage == WriteChunkStage.COMBINED) { |
| metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() |
| .getChunkData().getLen()); |
| } |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| return ChunkUtils.getChunkResponseSuccess(request); |
| } |
| |
| /** |
| * Handle Put Small File operation. Writes the chunk and associated key |
| * using a single RPC. Calls BlockManager and ChunkManager to process the |
| * request. |
| */ |
| ContainerCommandResponseProto handlePutSmallFile( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| |
| if (!request.hasPutSmallFile()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Put Small File request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| PutSmallFileRequestProto putSmallFileReq = |
| request.getPutSmallFile(); |
| BlockData blockData; |
| |
| try { |
| checkContainerOpen(kvContainer); |
| |
| BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock() |
| .getBlockData().getBlockID()); |
| blockData = BlockData.getFromProtoBuf( |
| putSmallFileReq.getBlock().getBlockData()); |
| Preconditions.checkNotNull(blockData); |
| |
| ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf( |
| putSmallFileReq.getChunkInfo()); |
| Preconditions.checkNotNull(chunkInfo); |
| ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer(); |
| if (dispatcherContext == null) { |
| dispatcherContext = new DispatcherContext.Builder().build(); |
| } |
| |
| // chunks will be committed as a part of handling putSmallFile |
| // here. There is no need to maintain this info in openContainerBlockMap. |
| chunkManager |
| .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); |
| |
| List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); |
| chunks.add(chunkInfo.getProtoBufMessage()); |
| blockData.setChunks(chunks); |
| blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex()); |
| |
| blockManager.putBlock(kvContainer, blockData); |
| metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity()); |
| |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Read Chunk failed", ex, |
| PUT_SMALL_FILE_ERROR), request); |
| } |
| |
| return SmallFileUtils.getPutFileResponseSuccess(request, blockData); |
| } |
| |
| /** |
| * Handle Get Small File operation. Gets a data stream using a key. This |
| * helps in reducing the RPC overhead for small files. Calls BlockManager and |
| * ChunkManager to process the request. |
| */ |
| ContainerCommandResponseProto handleGetSmallFile( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasGetSmallFile()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Get Small File request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return ContainerUtils.malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile(); |
| |
| try { |
| BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock() |
| .getBlockID()); |
| BlockData responseData = blockManager.getBlock(kvContainer, blockID); |
| |
| ContainerProtos.ChunkInfo chunkInfo = null; |
| ByteString dataBuf = ByteString.EMPTY; |
| DispatcherContext dispatcherContext = |
| new DispatcherContext.Builder().build(); |
| for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) { |
| // if the block is committed, all chunks must have been committed. |
| // Tmp chunk files won't exist here. |
| ByteBuffer data = chunkManager.readChunk(kvContainer, blockID, |
| ChunkInfo.getFromProtoBuf(chunk), dispatcherContext); |
| ByteString current = byteBufferToByteString.apply(data); |
| dataBuf = dataBuf.concat(current); |
| chunkInfo = chunk; |
| } |
| metrics.incContainerBytesStats(Type.GetSmallFile, dataBuf.size()); |
| return SmallFileUtils.getGetSmallFileResponseSuccess(request, dataBuf |
| .toByteArray(), ChunkInfo.getFromProtoBuf(chunkInfo)); |
| } catch (StorageContainerException e) { |
| return ContainerUtils.logAndReturnError(LOG, e, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Write Chunk failed", ex, |
| GET_SMALL_FILE_ERROR), request); |
| } |
| } |
| |
| /** |
| * Handle unsupported operation. |
| */ |
| ContainerCommandResponseProto handleUnsupportedOp( |
| ContainerCommandRequestProto request) { |
| // TODO : remove all unsupported operations or handle them. |
| return ContainerUtils.unsupportedRequest(request); |
| } |
| |
| /** |
| * Check if container is open. Throw exception otherwise. |
| * @param kvContainer |
| * @throws StorageContainerException |
| */ |
| private void checkContainerOpen(KeyValueContainer kvContainer) |
| throws StorageContainerException { |
| |
| final State containerState = kvContainer.getContainerState(); |
| |
| /* |
| * In a closing state, follower will receive transactions from leader. |
| * Once the leader is put to closing state, it will reject further requests |
| * from clients. Only the transactions which happened before the container |
| * in the leader goes to closing state, will arrive here even the container |
| * might already be in closing state here. |
| */ |
| if (containerState == State.OPEN || containerState == State.CLOSING) { |
| return; |
| } |
| |
| final ContainerProtos.Result result; |
| switch (containerState) { |
| case QUASI_CLOSED: |
| result = CLOSED_CONTAINER_IO; |
| break; |
| case CLOSED: |
| result = CLOSED_CONTAINER_IO; |
| break; |
| case UNHEALTHY: |
| result = CONTAINER_UNHEALTHY; |
| break; |
| case INVALID: |
| result = INVALID_CONTAINER_STATE; |
| break; |
| default: |
| result = CONTAINER_INTERNAL_ERROR; |
| } |
| String msg = "Requested operation not allowed as ContainerState is " + |
| containerState; |
| throw new StorageContainerException(msg, result); |
| } |
| |
| @Override |
| public Container importContainer(final long containerID, |
| final long maxSize, final String originPipelineId, |
| final String originNodeId, final InputStream rawContainerStream, |
| final TarContainerPacker packer) |
| throws IOException { |
| |
| // TODO: Add layout version! |
| KeyValueContainerData containerData = |
| new KeyValueContainerData(containerID, |
| maxSize, originPipelineId, originNodeId); |
| |
| KeyValueContainer container = new KeyValueContainer(containerData, |
| conf); |
| |
| populateContainerPathFields(container, maxSize); |
| container.importContainerData(rawContainerStream, packer); |
| sendICR(container); |
| return container; |
| |
| } |
| |
| @Override |
| public void exportContainer(final Container container, |
| final OutputStream outputStream, |
| final TarContainerPacker packer) |
| throws IOException{ |
| container.readLock(); |
| try { |
| final KeyValueContainer kvc = (KeyValueContainer) container; |
| kvc.exportContainerData(outputStream, packer); |
| } finally { |
| container.readUnlock(); |
| } |
| } |
| |
| @Override |
| public void markContainerForClose(Container container) |
| throws IOException { |
| container.writeLock(); |
| try { |
| // Move the container to CLOSING state only if it's OPEN |
| if (container.getContainerState() == State.OPEN) { |
| container.markContainerForClose(); |
| sendICR(container); |
| } |
| } finally { |
| container.writeUnlock(); |
| } |
| } |
| |
| @Override |
| public void markContainerUnhealthy(Container container) |
| throws IOException { |
| container.writeLock(); |
| try { |
| if (container.getContainerState() != State.UNHEALTHY) { |
| try { |
| container.markContainerUnhealthy(); |
| } catch (IOException ex) { |
| // explicitly catch IOException here since the this operation |
| // will fail if the Rocksdb metadata is corrupted. |
| long id = container.getContainerData().getContainerID(); |
| LOG.warn("Unexpected error while marking container " + id |
| + " as unhealthy", ex); |
| } finally { |
| sendICR(container); |
| } |
| } |
| } finally { |
| container.writeUnlock(); |
| } |
| } |
| |
| @Override |
| public void quasiCloseContainer(Container container) |
| throws IOException { |
| container.writeLock(); |
| try { |
| final State state = container.getContainerState(); |
| // Quasi close call is idempotent. |
| if (state == State.QUASI_CLOSED) { |
| return; |
| } |
| // The container has to be in CLOSING state. |
| if (state != State.CLOSING) { |
| ContainerProtos.Result error = |
| state == State.INVALID ? INVALID_CONTAINER_STATE : |
| CONTAINER_INTERNAL_ERROR; |
| throw new StorageContainerException( |
| "Cannot quasi close container #" + container.getContainerData() |
| .getContainerID() + " while in " + state + " state.", error); |
| } |
| container.quasiClose(); |
| sendICR(container); |
| } finally { |
| container.writeUnlock(); |
| } |
| } |
| |
| @Override |
| public void closeContainer(Container container) |
| throws IOException { |
| container.writeLock(); |
| try { |
| final State state = container.getContainerState(); |
| // Close call is idempotent. |
| if (state == State.CLOSED) { |
| return; |
| } |
| if (state == State.UNHEALTHY) { |
| throw new StorageContainerException( |
| "Cannot close container #" + container.getContainerData() |
| .getContainerID() + " while in " + state + " state.", |
| ContainerProtos.Result.CONTAINER_UNHEALTHY); |
| } |
| // The container has to be either in CLOSING or in QUASI_CLOSED state. |
| if (state != State.CLOSING && state != State.QUASI_CLOSED) { |
| ContainerProtos.Result error = |
| state == State.INVALID ? INVALID_CONTAINER_STATE : |
| CONTAINER_INTERNAL_ERROR; |
| throw new StorageContainerException( |
| "Cannot close container #" + container.getContainerData() |
| .getContainerID() + " while in " + state + " state.", error); |
| } |
| container.close(); |
| sendICR(container); |
| } finally { |
| container.writeUnlock(); |
| } |
| } |
| |
| @Override |
| public void deleteContainer(Container container, boolean force) |
| throws IOException { |
| deleteInternal(container, force); |
| } |
| |
| private void deleteInternal(Container container, boolean force) |
| throws StorageContainerException { |
| container.writeLock(); |
| try { |
| // If force is false, we check container state. |
| if (!force) { |
| // Check if container is open |
| if (container.getContainerData().isOpen()) { |
| throw new StorageContainerException( |
| "Deletion of Open Container is not allowed.", |
| DELETE_ON_OPEN_CONTAINER); |
| } |
| } |
| long containerId = container.getContainerData().getContainerID(); |
| containerSet.removeContainer(containerId); |
| } finally { |
| container.writeUnlock(); |
| } |
| // Avoid holding write locks for disk operations |
| container.delete(); |
| } |
| } |