blob: fd503c344d9fbd63e72691e3adaee42d63094734 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.om.helpers.*;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Maintaining a list of BlockInputStream. Write based on offset.
*
* Note that this may write to multiple containers in one write call. In case
* that first container succeeded but later ones failed, the succeeded writes
* are not rolled back.
*
* TODO : currently not support multi-thread access.
*/
public class KeyOutputStream extends OutputStream {
/**
* Defines stream action while calling handleFlushOrClose.
*/
enum StreamAction {
FLUSH, CLOSE, FULL
}
public static final Logger LOG =
LoggerFactory.getLogger(KeyOutputStream.class);
private boolean closed;
private FileEncryptionInfo feInfo;
private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
private int retryCount;
private long offset;
private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
/**
* A constructor for testing purpose only.
*/
@VisibleForTesting
public KeyOutputStream() {
closed = false;
this.retryPolicyMap = HddsClientUtils.getExceptionList()
.stream()
.collect(Collectors.toMap(Function.identity(),
e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
retryCount = 0;
offset = 0;
blockOutputStreamEntryPool = new BlockOutputStreamEntryPool();
}
@VisibleForTesting
public List<BlockOutputStreamEntry> getStreamEntries() {
return blockOutputStreamEntryPool.getStreamEntries();
}
@VisibleForTesting
public XceiverClientManager getXceiverClientManager() {
return blockOutputStreamEntryPool.getXceiverClientManager();
}
@VisibleForTesting
public List<OmKeyLocationInfo> getLocationInfoList() {
return blockOutputStreamEntryPool.getLocationInfoList();
}
@VisibleForTesting
public int getRetryCount() {
return retryCount;
}
@SuppressWarnings("parameternumber")
public KeyOutputStream(OpenKeySession handler,
XceiverClientManager xceiverClientManager,
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
ChecksumType checksumType, int bytesPerChecksum,
String uploadID, int partNumber, boolean isMultipart,
int maxRetryCount, long retryInterval) {
OmKeyInfo info = handler.getKeyInfo();
blockOutputStreamEntryPool =
new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor,
type, bufferFlushSize, bufferMaxSize, size, watchTimeout,
checksumType, bytesPerChecksum, uploadID, partNumber, isMultipart,
info, xceiverClientManager, handler.getId());
// Retrieve the file encryption key info, null if file is not in
// encrypted bucket.
this.feInfo = info.getFileEncryptionInfo();
this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
maxRetryCount, retryInterval);
this.retryCount = 0;
}
/**
* When a key is opened, it is possible that there are some blocks already
* allocated to it for this open session. In this case, to make use of these
* blocks, we need to add these blocks to stream entries. But, a key's version
* also includes blocks from previous versions, we need to avoid adding these
* old blocks to stream entries, because these old blocks should not be picked
* for write. To do this, the following method checks that, only those
* blocks created in this particular open version are added to stream entries.
*
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
* @throws IOException
*/
public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
long openVersion) throws IOException {
blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
}
@Override
public void write(int b) throws IOException {
byte[] buf = new byte[1];
buf[0] = (byte) b;
write(buf, 0, 1);
}
/**
* Try to write the bytes sequence b[off:off+len) to streams.
*
* NOTE: Throws exception if the data could not fit into the remaining space.
* In which case nothing will be written.
* TODO:May need to revisit this behaviour.
*
* @param b byte data
* @param off starting offset
* @param len length to write
* @throws IOException
*/
@Override
public void write(byte[] b, int off, int len)
throws IOException {
checkNotClosed();
if (b == null) {
throw new NullPointerException();
}
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|| ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return;
}
handleWrite(b, off, len, false);
}
private void handleWrite(byte[] b, int off, long len, boolean retry)
throws IOException {
while (len > 0) {
try {
BlockOutputStreamEntry current =
blockOutputStreamEntryPool.allocateBlockIfNeeded();
// length(len) will be in int range if the call is happening through
// write API of blockOutputStream. Length can be in long range if it
// comes via Exception path.
int writeLen = Math.min((int) len, (int) current.getRemaining());
long currentPos = current.getWrittenDataLength();
try {
if (retry) {
current.writeOnRetry(len);
} else {
current.write(b, off, writeLen);
offset += writeLen;
}
} catch (IOException ioe) {
// for the current iteration, totalDataWritten - currentPos gives the
// amount of data already written to the buffer
// In the retryPath, the total data to be written will always be equal
// to or less than the max length of the buffer allocated.
// The len specified here is the combined sum of the data length of
// the buffers
Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool
.getStreamBufferMaxSize());
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
writeLen = retry ? (int) len : dataWritten;
// In retry path, the data written is already accounted in offset.
if (!retry) {
offset += writeLen;
}
LOG.debug("writeLen {}, total len {}", writeLen, len);
handleException(current, ioe);
}
if (current.getRemaining() <= 0) {
// since the current block is already written close the stream.
handleFlushOrClose(StreamAction.FULL);
}
len -= writeLen;
off += writeLen;
} catch (Exception e) {
markStreamClosed();
throw e;
}
}
}
/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
* datanode.
* b. Reads the data from the underlying buffer and writes it the next stream.
*
* @param streamEntry StreamEntry
* @param exception actual exception that occurred
* @throws IOException Throws IOException if Write fails
*/
private void handleException(BlockOutputStreamEntry streamEntry,
IOException exception) throws IOException {
Throwable t = HddsClientUtils.checkForException(exception);
Preconditions.checkNotNull(t);
boolean retryFailure = checkForRetryFailure(t);
boolean containerExclusionException = false;
if (!retryFailure) {
containerExclusionException = checkIfContainerToExclude(t);
}
Pipeline pipeline = streamEntry.getPipeline();
PipelineID pipelineId = pipeline.getId();
long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
//set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
if (containerExclusionException) {
LOG.debug(
"Encountered exception {}. The last committed block length is {}, "
+ "uncommitted data length is {} retry count {}", exception,
totalSuccessfulFlushedData, bufferedDataLen, retryCount);
} else {
LOG.warn(
"Encountered exception {} on the pipeline {}. "
+ "The last committed block length is {}, "
+ "uncommitted data length is {} retry count {}", exception,
pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
}
Preconditions.checkArgument(
bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize());
Preconditions.checkArgument(
offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID();
Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
Preconditions.checkNotNull(failedServers);
ExcludeList excludeList = blockOutputStreamEntryPool.getExcludeList();
if (!failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers);
}
// if the container needs to be excluded , add the container to the
// exclusion list , otherwise add the pipeline to the exclusion list
if (containerExclusionException) {
excludeList.addConatinerId(ContainerID.valueof(containerId));
} else {
excludeList.addPipeline(pipelineId);
}
// just clean up the current stream.
streamEntry.cleanup(retryFailure);
// discard all subsequent blocks the containers and pipelines which
// are in the exclude list so that, the very next retry should never
// write data on the closed container/pipeline
if (containerExclusionException) {
// discard subsequent pre allocated blocks from the streamEntries list
// from the closed container
blockOutputStreamEntryPool
.discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
null);
} else {
// In case there is timeoutException or Watch for commit happening over
// majority or the client connection failure to the leader in the
// pipeline, just discard all the pre allocated blocks on this pipeline.
// Next block allocation will happen with excluding this specific pipeline
// This will ensure if 2 way commit happens , it cannot span over multiple
// blocks
blockOutputStreamEntryPool
.discardPreallocatedBlocks(-1, pipelineId);
}
if (bufferedDataLen > 0) {
// If the data is still cached in the underlying stream, we need to
// allocate new block and write this data in the datanode.
handleRetry(exception, bufferedDataLen);
// reset the retryCount after handling the exception
retryCount = 0;
}
}
private void markStreamClosed() {
blockOutputStreamEntryPool.cleanup();
closed = true;
}
private void handleRetry(IOException exception, long len) throws IOException {
RetryPolicy retryPolicy = retryPolicyMap
.get(HddsClientUtils.checkForException(exception).getClass());
if (retryPolicy == null) {
retryPolicy = retryPolicyMap.get(Exception.class);
}
RetryPolicy.RetryAction action;
try {
action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
} catch (Exception e) {
throw e instanceof IOException ? (IOException) e : new IOException(e);
}
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
String msg = "";
if (action.reason != null) {
msg = "Retry request failed. " + action.reason;
LOG.error(msg, exception);
}
throw new IOException(msg, exception);
}
// Throw the exception if the thread is interrupted
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Interrupted while trying for retry");
throw exception;
}
Preconditions.checkArgument(
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
if (action.delayMillis > 0) {
try {
Thread.sleep(action.delayMillis);
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(
"Interrupted: action=" + action + ", retry policy=" + retryPolicy)
.initCause(e);
}
}
retryCount++;
LOG.trace("Retrying Write request. Already tried " + retryCount
+ " time(s); retry policy is " + retryPolicy);
handleWrite(null, 0, len, true);
}
/**
* Checks if the provided exception signifies retry failure in ratis client.
* In case of retry failure, ratis client throws RaftRetryFailureException
* and all succeeding operations are failed with AlreadyClosedException.
*/
private boolean checkForRetryFailure(Throwable t) {
return t instanceof RaftRetryFailureException
|| t instanceof AlreadyClosedException;
}
// Every container specific exception from datatnode will be seen as
// StorageContainerException
private boolean checkIfContainerToExclude(Throwable t) {
return t instanceof StorageContainerException;
}
@Override
public void flush() throws IOException {
checkNotClosed();
handleFlushOrClose(StreamAction.FLUSH);
}
/**
* Close or Flush the latest outputStream depending upon the action.
* This function gets called when while write is going on, the current stream
* gets full or explicit flush or close request is made by client. when the
* stream gets full and we try to close the stream , we might end up hitting
* an exception in the exception handling path, we write the data residing in
* in the buffer pool to a new Block. In cases, as such, when the data gets
* written to new stream , it will be at max half full. In such cases, we
* should just write the data and not close the stream as the block won't be
* completely full.
*
* @param op Flag which decides whether to call close or flush on the
* outputStream.
* @throws IOException In case, flush or close fails with exception.
*/
private void handleFlushOrClose(StreamAction op) throws IOException {
if (blockOutputStreamEntryPool.isEmpty()) {
return;
}
while (true) {
try {
BlockOutputStreamEntry entry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
if (entry != null) {
try {
Collection<DatanodeDetails> failedServers =
entry.getFailedServers();
// failed servers can be null in case there is no data written in
// the stream
if (failedServers != null && !failedServers.isEmpty()) {
blockOutputStreamEntryPool.getExcludeList()
.addDatanodes(failedServers);
}
switch (op) {
case CLOSE:
entry.close();
break;
case FULL:
if (entry.getRemaining() == 0) {
entry.close();
}
break;
case FLUSH:
entry.flush();
break;
default:
throw new IOException("Invalid Operation");
}
} catch (IOException ioe) {
handleException(entry, ioe);
continue;
}
}
break;
} catch (Exception e) {
markStreamClosed();
throw e;
}
}
}
/**
* Commit the key to OM, this will add the blocks as the new key blocks.
*
* @throws IOException
*/
@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
try {
handleFlushOrClose(StreamAction.CLOSE);
blockOutputStreamEntryPool.commitKey(offset);
} catch (IOException ioe) {
throw ioe;
} finally {
blockOutputStreamEntryPool.cleanup();
}
}
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
return blockOutputStreamEntryPool.getCommitUploadPartInfo();
}
public FileEncryptionInfo getFileEncryptionInfo() {
return feInfo;
}
@VisibleForTesting
public ExcludeList getExcludeList() {
return blockOutputStreamEntryPool.getExcludeList();
}
/**
* Builder class of KeyOutputStream.
*/
public static class Builder {
private OpenKeySession openHandler;
private XceiverClientManager xceiverManager;
private OzoneManagerProtocol omClient;
private int chunkSize;
private String requestID;
private ReplicationType type;
private ReplicationFactor factor;
private long streamBufferFlushSize;
private long streamBufferMaxSize;
private long blockSize;
private long watchTimeout;
private ChecksumType checksumType;
private int bytesPerChecksum;
private String multipartUploadID;
private int multipartNumber;
private boolean isMultipartKey;
private int maxRetryCount;
private long retryInterval;
public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
return this;
}
public Builder setMultipartNumber(int partNumber) {
this.multipartNumber = partNumber;
return this;
}
public Builder setHandler(OpenKeySession handler) {
this.openHandler = handler;
return this;
}
public Builder setXceiverClientManager(XceiverClientManager manager) {
this.xceiverManager = manager;
return this;
}
public Builder setOmClient(OzoneManagerProtocol client) {
this.omClient = client;
return this;
}
public Builder setChunkSize(int size) {
this.chunkSize = size;
return this;
}
public Builder setRequestID(String id) {
this.requestID = id;
return this;
}
public Builder setType(ReplicationType replicationType) {
this.type = replicationType;
return this;
}
public Builder setFactor(ReplicationFactor replicationFactor) {
this.factor = replicationFactor;
return this;
}
public Builder setStreamBufferFlushSize(long size) {
this.streamBufferFlushSize = size;
return this;
}
public Builder setStreamBufferMaxSize(long size) {
this.streamBufferMaxSize = size;
return this;
}
public Builder setBlockSize(long size) {
this.blockSize = size;
return this;
}
public Builder setWatchTimeout(long timeout) {
this.watchTimeout = timeout;
return this;
}
public Builder setChecksumType(ChecksumType cType) {
this.checksumType = cType;
return this;
}
public Builder setBytesPerChecksum(int bytes) {
this.bytesPerChecksum = bytes;
return this;
}
public Builder setIsMultipartKey(boolean isMultipart) {
this.isMultipartKey = isMultipart;
return this;
}
public Builder setMaxRetryCount(int maxCount) {
this.maxRetryCount = maxCount;
return this;
}
public Builder setRetryInterval(long retryIntervalInMS) {
this.retryInterval = retryIntervalInMS;
return this;
}
public KeyOutputStream build() {
return new KeyOutputStream(openHandler, xceiverManager, omClient,
chunkSize, requestID, factor, type, streamBufferFlushSize,
streamBufferMaxSize, blockSize, watchTimeout, checksumType,
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
maxRetryCount, retryInterval);
}
}
/**
* Verify that the output stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+ blockOutputStreamEntryPool.getKeyName());
}
}
}