blob: 3c545b776ff90ea02aaf792b3fa7817083f43e94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
/**
* Handles the chunk EC writes for an EC internal block.
*/
public class ECBlockOutputStream extends BlockOutputStream {
private final DatanodeDetails datanodeDetails;
private CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
currentChunkRspFuture = null;
private CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
putBlkRspFuture = null;
/**
* Creates a new ECBlockOutputStream.
*
* @param blockID block ID
* @param xceiverClientManager client manager that controls client
* @param pipeline pipeline where block will be written
* @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
super(blockID, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.currentChunkRspFuture =
writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
updateWrittenDataLength(len);
}
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
ByteBuffer buff) throws IOException {
return writeChunkToContainer(ChunkBuffer.wrap(buff));
}
public CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> executePutBlock(boolean close,
boolean force, long blockGroupLength, BlockData[] blockData)
throws IOException {
ECReplicationConfig repConfig = (ECReplicationConfig)
getPipeline().getReplicationConfig();
int totalNodes = repConfig.getRequiredNodes();
int parity = repConfig.getParity();
//Write checksum only to parity and 1st Replica.
if (getReplicationIndex() > 1 &&
getReplicationIndex() <= (totalNodes - parity)) {
return executePutBlock(close, force, blockGroupLength);
}
BlockData checksumBlockData = null;
BlockID blockID = null;
//Reverse Traversal as all parity will have checksumBytes
for (int i = blockData.length - 1; i >= 0; i--) {
BlockData bd = blockData[i];
if (bd == null) {
continue;
}
if (blockID == null) {
// store the BlockID for logging
blockID = bd.getBlockID();
}
List<ChunkInfo> chunks = bd.getChunks();
if (chunks != null && chunks.size() > 0) {
if (chunks.get(0).hasStripeChecksum()) {
checksumBlockData = bd;
break;
} else {
ChunkInfo chunk = chunks.get(0);
LOG.debug("The first chunk in block with index {} does not have stripeChecksum. BlockID: {}, Block " +
"size: {}. Chunk length: {}, Chunk offset: {}, hasChecksumData: {}, chunks size: {}.", i,
bd.getBlockID(), bd.getSize(), chunk.getLen(), chunk.getOffset(), chunk.hasChecksumData(), chunks.size());
}
}
}
if (checksumBlockData != null) {
List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();
List<ChunkInfo> checksumBlockDataChunks = checksumBlockData.getChunks();
Preconditions.checkArgument(
currentChunks.size() == checksumBlockDataChunks.size(),
"The chunk list has " + currentChunks.size()
+ " entries, but the checksum chunks has "
+ checksumBlockDataChunks.size()
+ " entries. They should be equal in size.");
List<ChunkInfo> newChunkList = new ArrayList<>();
for (int i = 0; i < currentChunks.size(); i++) {
ChunkInfo chunkInfo = currentChunks.get(i);
ChunkInfo checksumChunk = checksumBlockDataChunks.get(i);
ChunkInfo.Builder builder = ChunkInfo.newBuilder(chunkInfo);
if (chunkInfo.hasChecksumData()) {
builder.setStripeChecksum(checksumChunk.getStripeChecksum());
}
ChunkInfo newInfo = builder.build();
newChunkList.add(newInfo);
}
getContainerBlockData().clearChunks();
getContainerBlockData().addAllChunks(newChunkList);
} else {
LOG.warn("Could not find checksum data in any index for blockData with BlockID {}, length {} and " +
"blockGroupLength {}.", blockID, blockData.length, blockGroupLength);
}
return executePutBlock(close, force, blockGroupLength);
}
public CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> executePutBlock(boolean close,
boolean force, long blockGroupLength, ByteString checksum)
throws IOException {
ECReplicationConfig repConfig = (ECReplicationConfig)
getPipeline().getReplicationConfig();
int totalNodes = repConfig.getRequiredNodes();
int parity = repConfig.getParity();
//Do not update checksum other than parity and 1st Replica.
if (!(getReplicationIndex() > 1 &&
getReplicationIndex() <= (totalNodes - parity))) {
updateChecksum(checksum);
}
return executePutBlock(close, force, blockGroupLength);
}
public CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> executePutBlock(boolean close,
boolean force, long blockGroupLength) throws IOException {
updateBlockGroupLengthInPutBlockMeta(blockGroupLength);
return executePutBlock(close, force);
}
private void updateBlockGroupLengthInPutBlockMeta(final long blockGroupLen) {
ContainerProtos.KeyValue keyValue = ContainerProtos.KeyValue.newBuilder()
.setKey(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK)
.setValue(String.valueOf(blockGroupLen)).build();
List<ContainerProtos.KeyValue> metadataList =
getContainerBlockData().getMetadataList().stream().filter(kv -> !Objects
.equals(kv.getKey(), OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK))
.collect(Collectors.toList());
metadataList.add(keyValue);
getContainerBlockData().clearMetadata(); // Clears old meta.
getContainerBlockData().addAllMetadata(metadataList); // Add updated meta.
}
private void updateChecksum(ByteString checksum) {
int size = getContainerBlockData().getChunksCount();
if (size > 0) {
ChunkInfo oldInfo = getContainerBlockData().getChunks(size - 1);
ChunkInfo newInfo = ChunkInfo.newBuilder(oldInfo)
.setStripeChecksum(checksum).build();
getContainerBlockData().removeChunks(size - 1);
getContainerBlockData().addChunks(newInfo);
}
}
/**
* @param close whether putBlock is happening as part of closing the stream
* @param force true if no data was written since most recent putBlock and
* stream is being closed
*/
public CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> executePutBlock(boolean close,
boolean force) throws IOException {
checkOpen();
CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> flushFuture = null;
try {
ContainerProtos.BlockData blockData = getContainerBlockData().build();
XceiverClientReply asyncReply =
putBlockAsync(getXceiverClient(), blockData, close, getTokenString());
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
try {
validateResponse(e);
} catch (IOException sce) {
throw new CompletionException(sce);
}
// if the ioException is not set, putBlock is successful
if (getIoException() == null) {
BlockID responseBlockID = BlockID.getFromProtobuf(
e.getPutBlock().getCommittedBlockLength().getBlockID());
Preconditions.checkState(getBlockID().getContainerBlockID()
.equals(responseBlockID.getContainerBlockID()));
}
return e;
}, getResponseExecutor()).exceptionally(e -> {
if (LOG.isDebugEnabled()) {
LOG.debug("putBlock failed for blockID {} with exception {}",
getBlockID(), e.getLocalizedMessage());
}
CompletionException ce = new CompletionException(e);
setIoException(ce);
throw ce;
});
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
this.putBlkRspFuture = flushFuture;
return flushFuture;
}
/**
* @return The current chunk writer response future.
*/
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
getCurrentChunkResponseFuture() {
return this.currentChunkRspFuture;
}
/**
* @return The current chunk putBlock response future.
*/
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
getCurrentPutBlkResponseFuture() {
return this.putBlkRspFuture;
}
/**
* Gets the target data node used in the current stream.
* @return DatanodeDetails
*/
public DatanodeDetails getDatanodeDetails() {
return datanodeDetails;
}
@Override
void validateResponse(
ContainerProtos.ContainerCommandResponseProto responseProto)
throws IOException {
try {
// if the ioException is already set, it means a prev request has failed
// just throw the exception. The current operation will fail with the
// original error
IOException exception = getIoException();
if (exception != null) {
return;
}
ContainerProtocolCalls.validateContainerResponse(responseProto);
} catch (IOException sce) {
setIoException(sce);
}
}
}