| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.container.keyvalue; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| |
| import org.apache.hadoop.hdds.client.BlockID; |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.conf.StorageUnit; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileRequestProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; |
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; |
| import org.apache.hadoop.hdds.scm.ByteStringConversion; |
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
| import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; |
| import org.apache.hadoop.ozone.OzoneConfigKeys; |
| import org.apache.hadoop.ozone.common.ChunkBuffer; |
| import org.apache.hadoop.ozone.container.common.helpers.BlockData; |
| import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; |
| import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; |
| import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; |
| import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion; |
| import org.apache.hadoop.ozone.container.common.impl.ContainerData; |
| import org.apache.hadoop.ozone.container.common.impl.ContainerSet; |
| import org.apache.hadoop.ozone.container.common.interfaces.Container; |
| import org.apache.hadoop.ozone.container.common.interfaces.Handler; |
| import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; |
| import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; |
| import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage; |
| import org.apache.hadoop.ozone.container.common.volume.HddsVolume; |
| import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; |
| import org.apache.hadoop.ozone.container.common.volume.VolumeSet; |
| import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; |
| import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory; |
| import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; |
| import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; |
| import org.apache.hadoop.util.AutoCloseableLock; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION; |
| import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockResponseSuccess; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess; |
| import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; |
| import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Handler for KeyValue Container type. |
| */ |
| public class KeyValueHandler extends Handler { |
| |
| private static final Logger LOG = LoggerFactory.getLogger( |
| KeyValueHandler.class); |
| |
| private final ContainerType containerType; |
| private final BlockManager blockManager; |
| private final ChunkManager chunkManager; |
| private final VolumeChoosingPolicy volumeChoosingPolicy; |
| private final long maxContainerSize; |
| private final Function<ByteBuffer, ByteString> byteBufferToByteString; |
| |
| // A lock that is held during container creation. |
| private final AutoCloseableLock containerCreationLock; |
| |
| public KeyValueHandler(ConfigurationSource config, String datanodeId, |
| ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics, |
| Consumer<ContainerReplicaProto> icrSender) { |
| super(config, datanodeId, contSet, volSet, metrics, icrSender); |
| containerType = ContainerType.KeyValueContainer; |
| blockManager = new BlockManagerImpl(config); |
| chunkManager = ChunkManagerFactory.createChunkManager(config, blockManager); |
| try { |
| volumeChoosingPolicy = conf.getClass( |
| HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy |
| .class, VolumeChoosingPolicy.class).newInstance(); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| maxContainerSize = (long) config.getStorageSize( |
| ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, |
| ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); |
| // this handler lock is used for synchronizing createContainer Requests, |
| // so using a fair lock here. |
| containerCreationLock = new AutoCloseableLock(new ReentrantLock(true)); |
| |
| boolean isUnsafeByteBufferConversionEnabled = |
| conf.getBoolean( |
| OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, |
| OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); |
| |
| byteBufferToByteString = |
| ByteStringConversion |
| .createByteBufferConversion(isUnsafeByteBufferConversionEnabled); |
| } |
| |
| @VisibleForTesting |
| public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() { |
| return volumeChoosingPolicy; |
| } |
| |
| @Override |
| public void stop() { |
| } |
| |
| @Override |
| public ContainerCommandResponseProto handle( |
| ContainerCommandRequestProto request, Container container, |
| DispatcherContext dispatcherContext) { |
| |
| return KeyValueHandler |
| .dispatchRequest(this, request, (KeyValueContainer) container, |
| dispatcherContext); |
| } |
| |
| @VisibleForTesting |
| static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| Type cmdType = request.getCmdType(); |
| |
| switch(cmdType) { |
| case CreateContainer: |
| return handler.handleCreateContainer(request, kvContainer); |
| case ReadContainer: |
| return handler.handleReadContainer(request, kvContainer); |
| case UpdateContainer: |
| return handler.handleUpdateContainer(request, kvContainer); |
| case DeleteContainer: |
| return handler.handleDeleteContainer(request, kvContainer); |
| case ListContainer: |
| return handler.handleUnsupportedOp(request); |
| case CloseContainer: |
| return handler.handleCloseContainer(request, kvContainer); |
| case PutBlock: |
| return handler.handlePutBlock(request, kvContainer, dispatcherContext); |
| case GetBlock: |
| return handler.handleGetBlock(request, kvContainer); |
| case DeleteBlock: |
| return handler.handleDeleteBlock(request, kvContainer); |
| case ListBlock: |
| return handler.handleUnsupportedOp(request); |
| case ReadChunk: |
| return handler.handleReadChunk(request, kvContainer, dispatcherContext); |
| case DeleteChunk: |
| return handler.handleDeleteChunk(request, kvContainer); |
| case WriteChunk: |
| return handler.handleWriteChunk(request, kvContainer, dispatcherContext); |
| case ListChunk: |
| return handler.handleUnsupportedOp(request); |
| case CompactChunk: |
| return handler.handleUnsupportedOp(request); |
| case PutSmallFile: |
| return handler |
| .handlePutSmallFile(request, kvContainer, dispatcherContext); |
| case GetSmallFile: |
| return handler.handleGetSmallFile(request, kvContainer); |
| case GetCommittedBlockLength: |
| return handler.handleGetCommittedBlockLength(request, kvContainer); |
| default: |
| return null; |
| } |
| } |
| |
| @VisibleForTesting |
| public ChunkManager getChunkManager() { |
| return this.chunkManager; |
| } |
| |
| @VisibleForTesting |
| public BlockManager getBlockManager() { |
| return this.blockManager; |
| } |
| |
| /** |
| * Handles Create Container Request. If successful, adds the container to |
| * ContainerSet and sends an ICR to the SCM. |
| */ |
| ContainerCommandResponseProto handleCreateContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| if (!request.hasCreateContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Create Container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| // Create Container request should be passed a null container as the |
| // container would be created here. |
| Preconditions.checkArgument(kvContainer == null); |
| |
| long containerID = request.getContainerID(); |
| |
| ChunkLayOutVersion layoutVersion = |
| ChunkLayOutVersion.getConfiguredVersion(conf); |
| KeyValueContainerData newContainerData = new KeyValueContainerData( |
| containerID, layoutVersion, maxContainerSize, request.getPipelineID(), |
| getDatanodeId()); |
| // TODO: Add support to add metadataList to ContainerData. Add metadata |
| // to container during creation. |
| KeyValueContainer newContainer = new KeyValueContainer( |
| newContainerData, conf); |
| |
| boolean created = false; |
| try (AutoCloseableLock l = containerCreationLock.acquire()) { |
| if (containerSet.getContainer(containerID) == null) { |
| newContainer.create(volumeSet, volumeChoosingPolicy, scmID); |
| created = containerSet.addContainer(newContainer); |
| } else { |
| // The create container request for an already existing container can |
| // arrive in case the ContainerStateMachine reapplies the transaction |
| // on datanode restart. Just log a warning msg here. |
| LOG.debug("Container already exists. container Id {}", containerID); |
| } |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } |
| |
| if (created) { |
| try { |
| sendICR(newContainer); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } |
| } |
| return getSuccessResponse(request); |
| } |
| |
| private void populateContainerPathFields(KeyValueContainer container) |
| throws IOException { |
| volumeSet.readLock(); |
| try { |
| HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet |
| .getVolumesList(), container.getContainerData().getMaxSize()); |
| String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); |
| container.populatePathFields(scmID, containerVolume, hddsVolumeDir); |
| } finally { |
| volumeSet.readUnlock(); |
| } |
| } |
| |
| /** |
| * Handles Read Container Request. Returns the ContainerData as response. |
| */ |
| ContainerCommandResponseProto handleReadContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| if (!request.hasReadContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Read Container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| KeyValueContainerData containerData = kvContainer.getContainerData(); |
| return getReadContainerResponse( |
| request, containerData.getProtoBufMessage()); |
| } |
| |
| |
| /** |
| * Handles Update Container Request. If successful, the container metadata |
| * is updated. |
| */ |
| ContainerCommandResponseProto handleUpdateContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasUpdateContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Update Container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| boolean forceUpdate = request.getUpdateContainer().getForceUpdate(); |
| List<KeyValue> keyValueList = |
| request.getUpdateContainer().getMetadataList(); |
| Map<String, String> metadata = new HashMap<>(); |
| for (KeyValue keyValue : keyValueList) { |
| metadata.put(keyValue.getKey(), keyValue.getValue()); |
| } |
| |
| try { |
| if (!metadata.isEmpty()) { |
| kvContainer.update(metadata, forceUpdate); |
| } |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } |
| return getSuccessResponse(request); |
| } |
| |
| /** |
| * Handles Delete Container Request. |
| * Open containers cannot be deleted. |
| * Holds writeLock on ContainerSet till the container is removed from |
| * containerMap. On disk deletion of container files will happen |
| * asynchronously without the lock. |
| */ |
| ContainerCommandResponseProto handleDeleteContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasDeleteContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Delete container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| boolean forceDelete = request.getDeleteContainer().getForceDelete(); |
| try { |
| deleteInternal(kvContainer, forceDelete); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } |
| return getSuccessResponse(request); |
| } |
| |
| /** |
| * Handles Close Container Request. An open container is closed. |
| * Close Container call is idempotent. |
| */ |
| ContainerCommandResponseProto handleCloseContainer( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasCloseContainer()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Update Container request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| try { |
| markContainerForClose(kvContainer); |
| closeContainer(kvContainer); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Close Container failed", ex, |
| IO_EXCEPTION), request); |
| } |
| |
| return getSuccessResponse(request); |
| } |
| |
| /** |
| * Handle Put Block operation. Calls BlockManager to process the request. |
| */ |
| ContainerCommandResponseProto handlePutBlock( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| |
| if (!request.hasPutBlock()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Put Key request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| final ContainerProtos.BlockData blockDataProto; |
| try { |
| checkContainerOpen(kvContainer); |
| |
| ContainerProtos.BlockData data = request.getPutBlock().getBlockData(); |
| BlockData blockData = BlockData.getFromProtoBuf(data); |
| Preconditions.checkNotNull(blockData); |
| |
| boolean incrKeyCount = false; |
| if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) { |
| chunkManager.finishWriteChunks(kvContainer, blockData); |
| incrKeyCount = true; |
| } |
| |
| long bcsId = |
| dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); |
| blockData.setBlockCommitSequenceId(bcsId); |
| blockManager.putBlock(kvContainer, blockData, incrKeyCount); |
| |
| blockDataProto = blockData.getProtoBufMessage(); |
| |
| final long numBytes = blockDataProto.getSerializedSize(); |
| metrics.incContainerBytesStats(Type.PutBlock, numBytes); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Put Key failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| return putBlockResponseSuccess(request, blockDataProto); |
| } |
| |
| /** |
| * Handle Get Block operation. Calls BlockManager to process the request. |
| */ |
| ContainerCommandResponseProto handleGetBlock( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasGetBlock()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Get Key request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| ContainerProtos.BlockData responseData; |
| try { |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getGetBlock().getBlockID()); |
| responseData = blockManager.getBlock(kvContainer, blockID) |
| .getProtoBufMessage(); |
| final long numBytes = responseData.getSerializedSize(); |
| metrics.incContainerBytesStats(Type.GetBlock, numBytes); |
| |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Get Key failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| return getBlockDataResponse(request, responseData); |
| } |
| |
| /** |
| * Handles GetCommittedBlockLength operation. |
| * Calls BlockManager to process the request. |
| */ |
| ContainerCommandResponseProto handleGetCommittedBlockLength( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| if (!request.hasGetCommittedBlockLength()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Get Key request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| long blockLength; |
| try { |
| BlockID blockID = BlockID |
| .getFromProtobuf(request.getGetCommittedBlockLength().getBlockID()); |
| blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("GetCommittedBlockLength failed", ex, |
| IO_EXCEPTION), request); |
| } |
| |
| return getBlockLengthResponse(request, blockLength); |
| } |
| |
| /** |
| * Handle Delete Block operation. Calls BlockManager to process the request. |
| */ |
| ContainerCommandResponseProto handleDeleteBlock( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasDeleteBlock()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Delete Key request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| try { |
| checkContainerOpen(kvContainer); |
| |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getDeleteBlock().getBlockID()); |
| |
| blockManager.deleteBlock(kvContainer, blockID); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Delete Key failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| return getBlockResponseSuccess(request); |
| } |
| |
| /** |
| * Handle Read Chunk operation. Calls ChunkManager to process the request. |
| */ |
| ContainerCommandResponseProto handleReadChunk( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| |
| if (!request.hasReadChunk()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Read Chunk request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout if that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| ChunkBuffer data; |
| try { |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getReadChunk().getBlockID()); |
| ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk() |
| .getChunkData()); |
| Preconditions.checkNotNull(chunkInfo); |
| |
| if (dispatcherContext == null) { |
| dispatcherContext = new DispatcherContext.Builder().build(); |
| } |
| |
| data = chunkManager |
| .readChunk(kvContainer, blockID, chunkInfo, dispatcherContext); |
| metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen()); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Read Chunk failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| Preconditions.checkNotNull(data, "Chunk data is null"); |
| |
| ByteString byteString = data.toByteString(byteBufferToByteString); |
| return getReadChunkResponse(request, byteString); |
| } |
| |
| /** |
| * Throw an exception if the container is unhealthy. |
| * |
| * @throws StorageContainerException if the container is unhealthy. |
| * @param kvContainer |
| */ |
| @VisibleForTesting |
| void checkContainerIsHealthy(KeyValueContainer kvContainer) |
| throws StorageContainerException { |
| kvContainer.readLock(); |
| try { |
| if (kvContainer.getContainerData().getState() == State.UNHEALTHY) { |
| throw new StorageContainerException( |
| "The container replica is unhealthy.", |
| CONTAINER_UNHEALTHY); |
| } |
| } finally { |
| kvContainer.readUnlock(); |
| } |
| } |
| |
| /** |
| * Handle Delete Chunk operation. Calls ChunkManager to process the request. |
| */ |
| ContainerCommandResponseProto handleDeleteChunk( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasDeleteChunk()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Delete Chunk request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| try { |
| checkContainerOpen(kvContainer); |
| |
| BlockID blockID = BlockID.getFromProtobuf( |
| request.getDeleteChunk().getBlockID()); |
| ContainerProtos.ChunkInfo chunkInfoProto = request.getDeleteChunk() |
| .getChunkData(); |
| ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); |
| Preconditions.checkNotNull(chunkInfo); |
| |
| chunkManager.deleteChunk(kvContainer, blockID, chunkInfo); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Delete Chunk failed", ex, |
| IO_EXCEPTION), request); |
| } |
| |
| return getSuccessResponse(request); |
| } |
| |
| /** |
| * Handle Write Chunk operation. Calls ChunkManager to process the request. |
| */ |
| ContainerCommandResponseProto handleWriteChunk( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| |
| if (!request.hasWriteChunk()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Write Chunk request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| try { |
| checkContainerOpen(kvContainer); |
| |
| WriteChunkRequestProto writeChunk = request.getWriteChunk(); |
| BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID()); |
| ContainerProtos.ChunkInfo chunkInfoProto = writeChunk.getChunkData(); |
| ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); |
| Preconditions.checkNotNull(chunkInfo); |
| |
| ChunkBuffer data = null; |
| if (dispatcherContext == null) { |
| dispatcherContext = new DispatcherContext.Builder().build(); |
| } |
| WriteChunkStage stage = dispatcherContext.getStage(); |
| if (stage == WriteChunkStage.WRITE_DATA || |
| stage == WriteChunkStage.COMBINED) { |
| data = |
| ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList()); |
| } |
| |
| chunkManager |
| .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); |
| |
| // We should increment stats after writeChunk |
| if (stage == WriteChunkStage.WRITE_DATA|| |
| stage == WriteChunkStage.COMBINED) { |
| metrics.incContainerBytesStats(Type.WriteChunk, writeChunk |
| .getChunkData().getLen()); |
| } |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION), |
| request); |
| } |
| |
| return getSuccessResponse(request); |
| } |
| |
| /** |
| * Handle Put Small File operation. Writes the chunk and associated key |
| * using a single RPC. Calls BlockManager and ChunkManager to process the |
| * request. |
| */ |
| ContainerCommandResponseProto handlePutSmallFile( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer, |
| DispatcherContext dispatcherContext) { |
| |
| if (!request.hasPutSmallFile()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Put Small File request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| PutSmallFileRequestProto putSmallFileReq = request.getPutSmallFile(); |
| final ContainerProtos.BlockData blockDataProto; |
| try { |
| checkContainerOpen(kvContainer); |
| |
| BlockData blockData = BlockData.getFromProtoBuf( |
| putSmallFileReq.getBlock().getBlockData()); |
| Preconditions.checkNotNull(blockData); |
| |
| ContainerProtos.ChunkInfo chunkInfoProto = putSmallFileReq.getChunkInfo(); |
| ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); |
| Preconditions.checkNotNull(chunkInfo); |
| |
| ChunkBuffer data = ChunkBuffer.wrap( |
| putSmallFileReq.getData().asReadOnlyByteBufferList()); |
| if (dispatcherContext == null) { |
| dispatcherContext = new DispatcherContext.Builder().build(); |
| } |
| |
| BlockID blockID = blockData.getBlockID(); |
| |
| // chunks will be committed as a part of handling putSmallFile |
| // here. There is no need to maintain this info in openContainerBlockMap. |
| chunkManager |
| .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); |
| chunkManager.finishWriteChunks(kvContainer, blockData); |
| |
| List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); |
| chunks.add(chunkInfoProto); |
| blockData.setChunks(chunks); |
| blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex()); |
| |
| blockManager.putBlock(kvContainer, blockData); |
| |
| blockDataProto = blockData.getProtoBufMessage(); |
| metrics.incContainerBytesStats(Type.PutSmallFile, chunkInfo.getLen()); |
| } catch (StorageContainerException ex) { |
| return ContainerUtils.logAndReturnError(LOG, ex, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Read Chunk failed", ex, |
| PUT_SMALL_FILE_ERROR), request); |
| } |
| |
| return getPutFileResponseSuccess(request, blockDataProto); |
| } |
| |
| /** |
| * Handle Get Small File operation. Gets a data stream using a key. This |
| * helps in reducing the RPC overhead for small files. Calls BlockManager and |
| * ChunkManager to process the request. |
| */ |
| ContainerCommandResponseProto handleGetSmallFile( |
| ContainerCommandRequestProto request, KeyValueContainer kvContainer) { |
| |
| if (!request.hasGetSmallFile()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Malformed Get Small File request. trace ID: {}", |
| request.getTraceID()); |
| } |
| return malformedRequest(request); |
| } |
| |
| // The container can become unhealthy after the lock is released. |
| // The operation will likely fail/timeout in that happens. |
| try { |
| checkContainerIsHealthy(kvContainer); |
| } catch (StorageContainerException sce) { |
| return ContainerUtils.logAndReturnError(LOG, sce, request); |
| } |
| |
| GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile(); |
| |
| try { |
| BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock() |
| .getBlockID()); |
| BlockData responseData = blockManager.getBlock(kvContainer, blockID); |
| |
| ContainerProtos.ChunkInfo chunkInfo = null; |
| ByteString dataBuf = ByteString.EMPTY; |
| DispatcherContext dispatcherContext = |
| new DispatcherContext.Builder().build(); |
| for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) { |
| // if the block is committed, all chunks must have been committed. |
| // Tmp chunk files won't exist here. |
| ChunkBuffer data = chunkManager.readChunk(kvContainer, blockID, |
| ChunkInfo.getFromProtoBuf(chunk), dispatcherContext); |
| ByteString current = data.toByteString(byteBufferToByteString); |
| dataBuf = dataBuf.concat(current); |
| chunkInfo = chunk; |
| } |
| metrics.incContainerBytesStats(Type.GetSmallFile, dataBuf.size()); |
| return getGetSmallFileResponseSuccess(request, dataBuf, chunkInfo); |
| } catch (StorageContainerException e) { |
| return ContainerUtils.logAndReturnError(LOG, e, request); |
| } catch (IOException ex) { |
| return ContainerUtils.logAndReturnError(LOG, |
| new StorageContainerException("Write Chunk failed", ex, |
| GET_SMALL_FILE_ERROR), request); |
| } |
| } |
| |
| /** |
| * Handle unsupported operation. |
| */ |
| ContainerCommandResponseProto handleUnsupportedOp( |
| ContainerCommandRequestProto request) { |
| // TODO : remove all unsupported operations or handle them. |
| return unsupportedRequest(request); |
| } |
| |
| /** |
| * Check if container is open. Throw exception otherwise. |
| * @param kvContainer |
| * @throws StorageContainerException |
| */ |
| private void checkContainerOpen(KeyValueContainer kvContainer) |
| throws StorageContainerException { |
| |
| final State containerState = kvContainer.getContainerState(); |
| |
| /* |
| * In a closing state, follower will receive transactions from leader. |
| * Once the leader is put to closing state, it will reject further requests |
| * from clients. Only the transactions which happened before the container |
| * in the leader goes to closing state, will arrive here even the container |
| * might already be in closing state here. |
| */ |
| if (containerState == State.OPEN || containerState == State.CLOSING) { |
| return; |
| } |
| |
| final ContainerProtos.Result result; |
| switch (containerState) { |
| case QUASI_CLOSED: |
| result = CLOSED_CONTAINER_IO; |
| break; |
| case CLOSED: |
| result = CLOSED_CONTAINER_IO; |
| break; |
| case UNHEALTHY: |
| result = CONTAINER_UNHEALTHY; |
| break; |
| case INVALID: |
| result = INVALID_CONTAINER_STATE; |
| break; |
| default: |
| result = CONTAINER_INTERNAL_ERROR; |
| } |
| String msg = "Requested operation not allowed as ContainerState is " + |
| containerState; |
| throw new StorageContainerException(msg, result); |
| } |
| |
| @Override |
| public Container importContainer(ContainerData originalContainerData, |
| final InputStream rawContainerStream, |
| final TarContainerPacker packer) |
| throws IOException { |
| |
| KeyValueContainerData containerData = |
| new KeyValueContainerData(originalContainerData); |
| |
| KeyValueContainer container = new KeyValueContainer(containerData, |
| conf); |
| |
| populateContainerPathFields(container); |
| container.importContainerData(rawContainerStream, packer); |
| sendICR(container); |
| return container; |
| |
| } |
| |
| @Override |
| public void exportContainer(final Container container, |
| final OutputStream outputStream, |
| final TarContainerPacker packer) |
| throws IOException{ |
| final KeyValueContainer kvc = (KeyValueContainer) container; |
| kvc.exportContainerData(outputStream, packer); |
| } |
| |
| @Override |
| public void markContainerForClose(Container container) |
| throws IOException { |
| container.writeLock(); |
| try { |
| // Move the container to CLOSING state only if it's OPEN |
| if (container.getContainerState() == State.OPEN) { |
| container.markContainerForClose(); |
| sendICR(container); |
| } |
| } finally { |
| container.writeUnlock(); |
| } |
| } |
| |
| @Override |
| public void markContainerUnhealthy(Container container) |
| throws IOException { |
| container.writeLock(); |
| try { |
| if (container.getContainerState() != State.UNHEALTHY) { |
| try { |
| container.markContainerUnhealthy(); |
| } catch (IOException ex) { |
| // explicitly catch IOException here since the this operation |
| // will fail if the Rocksdb metadata is corrupted. |
| long id = container.getContainerData().getContainerID(); |
| LOG.warn("Unexpected error while marking container " + id |
| + " as unhealthy", ex); |
| } finally { |
| sendICR(container); |
| } |
| } |
| } finally { |
| container.writeUnlock(); |
| } |
| } |
| |
| @Override |
| public void quasiCloseContainer(Container container) |
| throws IOException { |
| container.writeLock(); |
| try { |
| final State state = container.getContainerState(); |
| // Quasi close call is idempotent. |
| if (state == State.QUASI_CLOSED) { |
| return; |
| } |
| // The container has to be in CLOSING state. |
| if (state != State.CLOSING) { |
| ContainerProtos.Result error = |
| state == State.INVALID ? INVALID_CONTAINER_STATE : |
| CONTAINER_INTERNAL_ERROR; |
| throw new StorageContainerException( |
| "Cannot quasi close container #" + container.getContainerData() |
| .getContainerID() + " while in " + state + " state.", error); |
| } |
| container.quasiClose(); |
| sendICR(container); |
| } finally { |
| container.writeUnlock(); |
| } |
| } |
| |
| @Override |
| public void closeContainer(Container container) |
| throws IOException { |
| container.writeLock(); |
| try { |
| final State state = container.getContainerState(); |
| // Close call is idempotent. |
| if (state == State.CLOSED) { |
| return; |
| } |
| if (state == State.UNHEALTHY) { |
| throw new StorageContainerException( |
| "Cannot close container #" + container.getContainerData() |
| .getContainerID() + " while in " + state + " state.", |
| ContainerProtos.Result.CONTAINER_UNHEALTHY); |
| } |
| // The container has to be either in CLOSING or in QUASI_CLOSED state. |
| if (state != State.CLOSING && state != State.QUASI_CLOSED) { |
| ContainerProtos.Result error = |
| state == State.INVALID ? INVALID_CONTAINER_STATE : |
| CONTAINER_INTERNAL_ERROR; |
| throw new StorageContainerException( |
| "Cannot close container #" + container.getContainerData() |
| .getContainerID() + " while in " + state + " state.", error); |
| } |
| container.close(); |
| sendICR(container); |
| } finally { |
| container.writeUnlock(); |
| } |
| } |
| |
| @Override |
| public void deleteContainer(Container container, boolean force) |
| throws IOException { |
| deleteInternal(container, force); |
| } |
| |
| @Override |
| public void deleteBlock(Container container, BlockData blockData) |
| throws IOException { |
| chunkManager.deleteChunks(container, blockData); |
| for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) { |
| ChunkInfo info = ChunkInfo.getFromProtoBuf(chunkInfo); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("block {} chunk {} deleted", blockData.getBlockID(), info); |
| } |
| } |
| } |
| |
| private void deleteInternal(Container container, boolean force) |
| throws StorageContainerException { |
| container.writeLock(); |
| try { |
| // If force is false, we check container state. |
| if (!force) { |
| // Check if container is open |
| if (container.getContainerData().isOpen()) { |
| throw new StorageContainerException( |
| "Deletion of Open Container is not allowed.", |
| DELETE_ON_OPEN_CONTAINER); |
| } |
| } |
| long containerId = container.getContainerData().getContainerID(); |
| containerSet.removeContainer(containerId); |
| } finally { |
| container.writeUnlock(); |
| } |
| // Avoid holding write locks for disk operations |
| container.delete(); |
| container.getContainerData().setState(State.DELETED); |
| sendICR(container); |
| } |
| } |