blob: b15ca3f6c85fc542fdfd0edf6835a42446afb5e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.client.BlockID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.writeChunkAsync;
/**
* An {@link OutputStream} used by the REST service in combination with the
* SCMClient to write the value of a key to a sequence
* of container chunks. Writes are buffered locally and periodically written to
* the container as a new chunk. In order to preserve the semantics that
* replacement of a pre-existing key is atomic, each instance of the stream has
* an internal unique identifier. This unique identifier and a monotonically
* increasing chunk index form a composite key that is used as the chunk name.
* After all data is written, a putKey call creates or updates the corresponding
* container key, and this call includes the full list of chunks that make up
* the key data. The list of chunks is updated all at once. Therefore, a
* concurrent reader never can see an intermediate state in which different
* chunks of data from different versions of the key data are interleaved.
* This class encapsulates all state management for buffering and writing
* through to the container.
*/
public class BlockOutputStream extends OutputStream {
public static final Logger LOG =
LoggerFactory.getLogger(BlockOutputStream.class);
private volatile BlockID blockID;
private final BlockData.Builder containerBlockData;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private final ContainerProtos.ChecksumType checksumType;
private final int bytesPerChecksum;
private int chunkIndex;
private int chunkSize;
private final long streamBufferFlushSize;
private final long streamBufferMaxSize;
private BufferPool bufferPool;
// The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next
// request will fail upfront.
private AtomicReference<IOException> ioException;
private ExecutorService responseExecutor;
// the effective length of data flushed so far
private long totalDataFlushedLength;
// effective data write attempted so far for the block
private long writtenDataLength;
// List containing buffers for which the putBlock call will
// update the length in the datanodes. This list will just maintain
// references to the buffers in the BufferPool which will be cleared
// when the watchForCommit acknowledges a putBlock logIndex has been
// committed on all datanodes. This list will be a place holder for buffers
// which got written between successive putBlock calls.
private List<ByteBuffer> bufferList;
// This object will maintain the commitIndexes and byteBufferList in order
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final CommitWatcher commitWatcher;
private List<DatanodeDetails> failedServers;
/**
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param xceiverClientManager client manager that controls client
* @param pipeline pipeline where block will be written
* @param chunkSize chunk size
* @param bufferPool pool of buffers
* @param streamBufferFlushSize flush size
* @param streamBufferMaxSize max size of the currentBuffer
* @param watchTimeout watch timeout
* @param checksumType checksum type
* @param bytesPerChecksum Bytes per checksum
*/
@SuppressWarnings("parameternumber")
public BlockOutputStream(BlockID blockID,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
int chunkSize, long streamBufferFlushSize, long streamBufferMaxSize,
long watchTimeout, BufferPool bufferPool, ChecksumType checksumType,
int bytesPerChecksum)
throws IOException {
this.blockID = blockID;
this.chunkSize = chunkSize;
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
this.containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.chunkIndex = 0;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.bufferPool = bufferPool;
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
commitWatcher = new CommitWatcher(bufferPool, xceiverClient, watchTimeout);
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
failedServers = new ArrayList<>(0);
ioException = new AtomicReference<>(null);
}
public BlockID getBlockID() {
return blockID;
}
public long getTotalAckDataLength() {
return commitWatcher.getTotalAckDataLength();
}
public long getWrittenDataLength() {
return writtenDataLength;
}
public List<DatanodeDetails> getFailedServers() {
return failedServers;
}
@VisibleForTesting
public XceiverClientSpi getXceiverClient() {
return xceiverClient;
}
@VisibleForTesting
public long getTotalDataFlushedLength() {
return totalDataFlushedLength;
}
@VisibleForTesting
public BufferPool getBufferPool() {
return bufferPool;
}
public IOException getIoException() {
return ioException.get();
}
@VisibleForTesting
public Map<Long, List<ByteBuffer>> getCommitIndex2flushedDataMap() {
return commitWatcher.getCommitIndex2flushedDataMap();
}
@Override
public void write(int b) throws IOException {
checkOpen();
byte[] buf = new byte[1];
buf[0] = (byte) b;
write(buf, 0, 1);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkOpen();
if (b == null) {
throw new NullPointerException();
}
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|| ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return;
}
while (len > 0) {
int writeLen;
// Allocate a buffer if needed. The buffer will be allocated only
// once as needed and will be reused again for multiple blockOutputStream
// entries.
ByteBuffer currentBuffer = bufferPool.allocateBufferIfNeeded();
int pos = currentBuffer.position();
writeLen =
Math.min(chunkSize - pos % chunkSize, len);
currentBuffer.put(b, off, writeLen);
if (!currentBuffer.hasRemaining()) {
writeChunk(currentBuffer);
}
off += writeLen;
len -= writeLen;
writtenDataLength += writeLen;
if (shouldFlush()) {
updateFlushLength();
executePutBlock();
}
// Data in the bufferPool can not exceed streamBufferMaxSize
if (isBufferPoolFull()) {
handleFullBuffer();
}
}
}
private boolean shouldFlush() {
return bufferPool.computeBufferData() % streamBufferFlushSize == 0;
}
private void updateFlushLength() {
totalDataFlushedLength += writtenDataLength - totalDataFlushedLength;
}
private boolean isBufferPoolFull() {
return bufferPool.computeBufferData() == streamBufferMaxSize;
}
/**
* Will be called on the retryPath in case closedContainerException/
* TimeoutException.
* @param len length of data to write
* @throws IOException if error occurred
*/
// In this case, the data is already cached in the currentBuffer.
public void writeOnRetry(long len) throws IOException {
if (len == 0) {
return;
}
int count = 0;
Preconditions.checkArgument(len <= streamBufferMaxSize);
while (len > 0) {
long writeLen;
writeLen = Math.min(chunkSize, len);
if (writeLen == chunkSize) {
writeChunk(bufferPool.getBuffer(count));
}
len -= writeLen;
count++;
writtenDataLength += writeLen;
// we should not call isBufferFull/shouldFlush here.
// The buffer might already be full as whole data is already cached in
// the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
// call for handling full buffer/flush buffer condition.
if (writtenDataLength % streamBufferFlushSize == 0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
updateFlushLength();
executePutBlock();
}
if (writtenDataLength == streamBufferMaxSize) {
handleFullBuffer();
}
}
}
/**
* This is a blocking call. It will wait for the flush till the commit index
* at the head of the commitIndex2flushedDataMap gets replicated to all or
* majority.
* @throws IOException
*/
private void handleFullBuffer() throws IOException {
try {
checkOpen();
if (!commitWatcher.getFutureMap().isEmpty()) {
waitOnFlushFutures();
}
} catch (InterruptedException | ExecutionException e) {
setIoException(e);
adjustBuffersOnException();
throw getIoException();
}
watchForCommit(true);
}
// It may happen that once the exception is encountered , we still might
// have successfully flushed up to a certain index. Make sure the buffers
// only contain data which have not been sufficiently replicated
private void adjustBuffersOnException() {
commitWatcher.releaseBuffersOnException();
}
/**
* calls watchForCommit API of the Ratis Client. For Standalone client,
* it is a no op.
* @param bufferFull flag indicating whether bufferFull condition is hit or
* its called as part flush/close
* @return minimum commit index replicated to all nodes
* @throws IOException IOException in case watch gets timed out
*/
private void watchForCommit(boolean bufferFull) throws IOException {
checkOpen();
try {
XceiverClientReply reply = bufferFull ?
commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
if (reply != null) {
List<DatanodeDetails> dnList = reply.getDatanodes();
if (!dnList.isEmpty()) {
Pipeline pipe = xceiverClient.getPipeline();
LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
blockID, pipe, dnList);
failedServers.addAll(dnList);
}
}
} catch (IOException ioe) {
setIoException(ioe);
throw getIoException();
}
}
private CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> executePutBlock()
throws IOException {
checkOpen();
long flushPos = totalDataFlushedLength;
Preconditions.checkNotNull(bufferList);
List<ByteBuffer> byteBufferList = bufferList;
bufferList = null;
Preconditions.checkNotNull(byteBufferList);
CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> flushFuture;
try {
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, containerBlockData.build());
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
try {
validateResponse(e);
} catch (IOException sce) {
throw new CompletionException(sce);
}
// if the ioException is not set, putBlock is successful
if (getIoException() == null) {
BlockID responseBlockID = BlockID.getFromProtobuf(
e.getPutBlock().getCommittedBlockLength().getBlockID());
Preconditions.checkState(blockID.getContainerBlockID()
.equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block
blockID = responseBlockID;
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ commitWatcher.getCommitInfoMapSize() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
+ " blockID " + blockID + " bufferPool size" + bufferPool
.getSize() + " currentBufferIndex " + bufferPool
.getCurrentBufferIndex());
}
// for standalone protocol, logIndex will always be 0.
commitWatcher
.updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
if (LOG.isDebugEnabled()) {
LOG.debug(
"putBlock failed for blockID " + blockID + " with exception " + e
.getLocalizedMessage());
}
CompletionException ce = new CompletionException(e);
setIoException(ce);
throw ce;
});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
}
commitWatcher.getFutureMap().put(flushPos, flushFuture);
return flushFuture;
}
@Override
public void flush() throws IOException {
if (xceiverClientManager != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0) {
try {
handleFlush();
} catch (InterruptedException | ExecutionException e) {
// just set the exception here as well in order to maintain sanctity of
// ioException field
setIoException(e);
adjustBuffersOnException();
throw getIoException();
}
}
}
private void writeChunk(ByteBuffer buffer)
throws IOException {
// This data in the buffer will be pushed to datanode and a reference will
// be added to the bufferList. Once putBlock gets executed, this list will
// be marked null. Hence, during first writeChunk call after every putBlock
// call or during the first call to writeChunk here, the list will be null.
if (bufferList == null) {
bufferList = new ArrayList<>();
}
bufferList.add(buffer);
// Please note : We are not flipping the slice when we write since
// the slices are pointing the currentBuffer start and end as needed for
// the chunk write. Also please note, Duplicate does not create a
// copy of data, it only creates metadata that points to the data
// stream.
ByteBuffer chunk = buffer.duplicate();
chunk.position(0);
chunk.limit(buffer.position());
writeChunkToContainer(chunk);
}
private void handleFlush()
throws IOException, InterruptedException, ExecutionException {
checkOpen();
// flush the last chunk data residing on the currentBuffer
if (totalDataFlushedLength < writtenDataLength) {
ByteBuffer currentBuffer = bufferPool.getCurrentBuffer();
Preconditions.checkArgument(currentBuffer.position() > 0);
if (currentBuffer.position() != chunkSize) {
writeChunk(currentBuffer);
}
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
updateFlushLength();
executePutBlock();
}
waitOnFlushFutures();
watchForCommit(false);
// just check again if the exception is hit while waiting for the
// futures to ensure flush has indeed succeeded
// irrespective of whether the commitIndex2flushedDataMap is empty
// or not, ensure there is no exception set
checkOpen();
}
@Override
public void close() throws IOException {
if (xceiverClientManager != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0) {
try {
handleFlush();
} catch (InterruptedException | ExecutionException e) {
setIoException(e);
adjustBuffersOnException();
throw getIoException();
} finally {
cleanup(false);
}
// TODO: Turn the below buffer empty check on when Standalone pipeline
// is removed in the write path in tests
// Preconditions.checkArgument(buffer.position() == 0);
// bufferPool.checkBufferPoolEmpty();
}
}
private void waitOnFlushFutures()
throws InterruptedException, ExecutionException {
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[commitWatcher.getFutureMap().size()]));
// wait for all the transactions to complete
combinedFuture.get();
}
private void validateResponse(
ContainerProtos.ContainerCommandResponseProto responseProto)
throws IOException {
try {
// if the ioException is already set, it means a prev request has failed
// just throw the exception. The current operation will fail with the
// original error
IOException exception = getIoException();
if (exception != null) {
throw exception;
}
ContainerProtocolCalls.validateContainerResponse(responseProto);
} catch (StorageContainerException sce) {
LOG.error("Unexpected Storage Container Exception: ", sce);
setIoException(sce);
throw sce;
}
}
private void setIoException(Exception e) {
if (getIoException() == null) {
IOException exception = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
ioException.compareAndSet(null, exception);
}
}
public void cleanup(boolean invalidateClient) {
if (xceiverClientManager != null) {
xceiverClientManager.releaseClient(xceiverClient, invalidateClient);
}
xceiverClientManager = null;
xceiverClient = null;
commitWatcher.cleanup();
if (bufferList != null) {
bufferList.clear();
}
bufferList = null;
responseExecutor.shutdown();
}
/**
* Checks if the stream is open or exception has occured.
* If not, throws an exception.
*
* @throws IOException if stream is closed
*/
private void checkOpen() throws IOException {
if (isClosed()) {
throw new IOException("BlockOutputStream has been closed.");
} else if (getIoException() != null) {
adjustBuffersOnException();
throw getIoException();
}
}
public boolean isClosed() {
return xceiverClient == null;
}
/**
* Writes buffered data as a new chunk to the container and saves chunk
* information to be used later in putKey call.
*
* @throws IOException if there is an I/O error while performing the call
* @throws OzoneChecksumException if there is an error while computing
* checksum
*/
private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
int effectiveChunkSize = chunk.remaining();
ByteString data = bufferPool.byteStringConversion().apply(chunk);
Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
ChecksumData checksumData = checksum.computeChecksum(chunk);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(blockID.getLocalID() + "_chunk_" + ++chunkIndex)
.setOffset(0)
.setLen(effectiveChunkSize)
.setChecksumData(checksumData.getProtoBufMessage())
.build();
try {
XceiverClientReply asyncReply =
writeChunkAsync(xceiverClient, chunkInfo, blockID, data);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
future.thenApplyAsync(e -> {
try {
validateResponse(e);
} catch (IOException sce) {
future.completeExceptionally(sce);
}
return e;
}, responseExecutor).exceptionally(e -> {
if (LOG.isDebugEnabled()) {
LOG.debug(
"writing chunk failed " + chunkInfo.getChunkName() + " blockID "
+ blockID + " with exception " + e.getLocalizedMessage());
}
CompletionException ce = new CompletionException(e);
setIoException(ce);
throw ce;
});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"writing chunk " + chunkInfo.getChunkName() + " blockID " + blockID
+ " length " + effectiveChunkSize);
}
containerBlockData.addChunks(chunkInfo);
}
@VisibleForTesting
public void setXceiverClient(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient;
}
}