blob: bcc67616805649b6785d66ed77baeb829d8888d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.io;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ECKeyOutputStream handles the EC writes by writing the data into underlying
* block output streams chunk by chunk.
*/
public final class ECKeyOutputStream extends KeyOutputStream {
private OzoneClientConfig config;
private ECChunkBuffers ecChunkBufferCache;
private int ecChunkSize;
private final int numDataBlks;
private final int numParityBlks;
private final ByteBufferPool bufferPool;
private final RawErasureEncoder encoder;
private enum StripeWriteStatus {
SUCCESS,
FAILED
}
public static final Logger LOG =
LoggerFactory.getLogger(KeyOutputStream.class);
private boolean closed;
// how much of data is actually written yet to underlying stream
private long offset;
// how much data has been ingested into the stream
private long writeOffset;
private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool;
@VisibleForTesting
public List<BlockOutputStreamEntry> getStreamEntries() {
return blockOutputStreamEntryPool.getStreamEntries();
}
@VisibleForTesting
public XceiverClientFactory getXceiverClientFactory() {
return blockOutputStreamEntryPool.getXceiverClientFactory();
}
@VisibleForTesting
public List<OmKeyLocationInfo> getLocationInfoList() {
return blockOutputStreamEntryPool.getLocationInfoList();
}
private ECKeyOutputStream(Builder builder) {
this.config = builder.getClientConfig();
this.bufferPool = builder.getByteBufferPool();
// For EC, cell/chunk size and buffer size can be same for now.
ecChunkSize = builder.getReplicationConfig().getEcChunkSize();
this.config.setStreamBufferMaxSize(ecChunkSize);
this.config.setStreamBufferFlushSize(ecChunkSize);
this.config.setStreamBufferSize(ecChunkSize);
this.numDataBlks = builder.getReplicationConfig().getData();
this.numParityBlks = builder.getReplicationConfig().getParity();
ecChunkBufferCache = new ECChunkBuffers(
ecChunkSize, numDataBlks, numParityBlks, bufferPool);
OmKeyInfo info = builder.getOpenHandler().getKeyInfo();
blockOutputStreamEntryPool =
new ECBlockOutputStreamEntryPool(config,
builder.getOmClient(), builder.getRequestID(),
builder.getReplicationConfig(),
builder.getMultipartUploadID(), builder.getMultipartNumber(),
builder.isMultipartKey(),
info, builder.isUnsafeByteBufferConversionEnabled(),
builder.getXceiverManager(), builder.getOpenHandler().getId());
this.writeOffset = 0;
this.encoder = CodecUtil.createRawEncoderWithFallback(
builder.getReplicationConfig());
}
/**
* When a key is opened, it is possible that there are some blocks already
* allocated to it for this open session. In this case, to make use of these
* blocks, we need to add these blocks to stream entries. But, a key's version
* also includes blocks from previous versions, we need to avoid adding these
* old blocks to stream entries, because these old blocks should not be picked
* for write. To do this, the following method checks that, only those
* blocks created in this particular open version are added to stream entries.
*
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
* @throws IOException
*/
public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
long openVersion) throws IOException {
blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
}
/**
* Try to write the bytes sequence b[off:off+len) to underlying EC block
* streams.
*
* @param b byte data
* @param off starting offset
* @param len length to write
* @throws IOException
*/
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkNotClosed();
if (b == null) {
throw new NullPointerException();
}
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|| ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
try {
int writtenLen = 0;
while (writtenLen < len) {
writtenLen += handleWrite(b, off + writtenLen, len - writtenLen);
}
} catch (Exception e) {
markStreamClosed();
throw new IOException(e.getMessage());
}
writeOffset += len;
}
private StripeWriteStatus rewriteStripeToNewBlockGroup() throws IOException {
// Rollback the length/offset updated as part of this failed stripe write.
final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
offset -= Arrays.stream(dataBuffers).mapToInt(Buffer::limit).sum();
final ECBlockOutputStreamEntry failedStreamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
failedStreamEntry.resetToFirstEntry();
failedStreamEntry.resetToAckedPosition();
// All pre-allocated blocks from the same pipeline
// should be dropped to eliminate worthless retries.
blockOutputStreamEntryPool.discardPreallocatedBlocks(-1,
failedStreamEntry.getPipeline().getId());
// Let's close the current entry.
failedStreamEntry.close();
// Let's rewrite the last stripe, so that it will be written to new block
// group.
// TODO: we can improve to write partial stripe failures. In that case,
// we just need to write only available buffers.
blockOutputStreamEntryPool.allocateBlockIfNeeded();
final ECBlockOutputStreamEntry currentStreamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
for (int i = 0; i < numDataBlks; i++) {
if (dataBuffers[i].limit() > 0) {
handleOutputStreamWrite(i, dataBuffers[i].limit(), false);
}
currentStreamEntry.useNextBlockStream();
}
return handleParityWrites();
}
private void encodeAndWriteParityCells() throws IOException {
generateParityCells();
if (handleParityWrites() == StripeWriteStatus.FAILED) {
retryStripeWrite(config.getMaxECStripeWriteRetries());
}
}
private void logStreamError(List<ECBlockOutputStream> failedStreams,
String operation) {
Set<Integer> failedStreamIndexSet =
failedStreams.stream().map(ECBlockOutputStream::getReplicationIndex)
.collect(Collectors.toSet());
String failedStreamsString = IntStream.range(1,
numDataBlks + numParityBlks + 1)
.mapToObj(index -> failedStreamIndexSet.contains(index)
? "F" : "S")
.collect(Collectors.joining(" "));
LOG.warn("{} failed: {}", operation, failedStreamsString);
for (ECBlockOutputStream stream : failedStreams) {
LOG.warn("Failure for replica index: {}, DatanodeDetails: {}",
stream.getReplicationIndex(), stream.getDatanodeDetails(),
stream.getIoException());
}
}
private StripeWriteStatus handleParityWrites() throws IOException {
writeParityCells();
ECBlockOutputStreamEntry streamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
List<ECBlockOutputStream> failedStreams =
streamEntry.streamsWithWriteFailure();
if (!failedStreams.isEmpty()) {
if (LOG.isDebugEnabled()) {
logStreamError(failedStreams, "EC stripe write");
}
excludePipelineAndFailedDN(streamEntry.getPipeline(), failedStreams);
return StripeWriteStatus.FAILED;
}
// By this time, we should have finished full stripe. So, lets call
// executePutBlock for all.
// TODO: we should alter the put block calls to share CRC to each stream.
final boolean isLastStripe = streamEntry.getRemaining() <= 0 ||
ecChunkBufferCache.getLastDataCell().limit() < ecChunkSize;
streamEntry.executePutBlock(isLastStripe, streamEntry.getCurrentPosition());
failedStreams = streamEntry.streamsWithPutBlockFailure();
if (!failedStreams.isEmpty()) {
if (LOG.isDebugEnabled()) {
logStreamError(failedStreams, "Put block");
}
excludePipelineAndFailedDN(streamEntry.getPipeline(), failedStreams);
return StripeWriteStatus.FAILED;
}
streamEntry.updateBlockGroupToAckedPosition(
streamEntry.getCurrentPosition());
ecChunkBufferCache.clear();
if (streamEntry.getRemaining() <= 0) {
streamEntry.close();
} else {
streamEntry.resetToFirstEntry();
}
return StripeWriteStatus.SUCCESS;
}
private void excludePipelineAndFailedDN(Pipeline pipeline,
List<ECBlockOutputStream> failedStreams) {
// Exclude the failed pipeline
blockOutputStreamEntryPool.getExcludeList().addPipeline(pipeline.getId());
// If the failure is NOT caused by other reasons (e.g. container full),
// we assume it is caused by DN failure and exclude the failed DN.
failedStreams.stream()
.filter(s -> !checkIfContainerToExclude(
HddsClientUtils.checkForException(s.getIoException())))
.forEach(s -> blockOutputStreamEntryPool.getExcludeList()
.addDatanode(s.getDatanodeDetails()));
}
@Override
protected boolean checkIfContainerToExclude(Throwable t) {
return super.checkIfContainerToExclude(t)
&& t instanceof ContainerNotOpenException;
}
private void generateParityCells() throws IOException {
final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
// parityCellSize = min(ecChunkSize, stripeSize)
// = min(cellSize, sum(dataBuffers positions))
// = min(dataBuffers[0].limit(), dataBuffers[0].position())
// = dataBuffers[0].position()
final int parityCellSize = dataBuffers[0].position();
int firstNonFullIndex = dataBuffers.length;
int firstNonFullLength = 0;
for (int i = 0; i < dataBuffers.length; i++) {
if (dataBuffers[i].position() != ecChunkSize) {
firstNonFullIndex = i;
firstNonFullLength = dataBuffers[i].position();
break;
}
}
for (int i = firstNonFullIndex + 1; i < dataBuffers.length; i++) {
Preconditions.checkState(dataBuffers[i].position() == 0,
"Illegal stripe state: cell {} is not full while cell {} has data",
firstNonFullIndex, i);
}
// Add padding to dataBuffers for encode if stripe is not full.
for (int i = firstNonFullIndex; i < dataBuffers.length; i++) {
padBufferToLimit(dataBuffers[i], parityCellSize);
}
for (ByteBuffer b : parityBuffers) {
b.limit(parityCellSize);
}
for (ByteBuffer b : dataBuffers) {
b.flip();
}
encoder.encode(dataBuffers, parityBuffers);
// Remove padding from dataBuffers for (re)write data cells.
if (firstNonFullIndex < dataBuffers.length) {
dataBuffers[firstNonFullIndex].limit(firstNonFullLength);
}
for (int i = firstNonFullIndex + 1; i < dataBuffers.length; i++) {
dataBuffers[i].limit(0);
}
}
private void writeParityCells() {
// Move the stream entry cursor to parity block index
blockOutputStreamEntryPool
.getCurrentStreamEntry().forceToFirstParityBlock();
ByteBuffer[] parityCells = ecChunkBufferCache.getParityBuffers();
for (int i = 0; i < numParityBlks; i++) {
handleOutputStreamWrite(numDataBlks + i, parityCells[i].limit(), true);
blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
}
}
private int handleWrite(byte[] b, int off, int len) throws IOException {
blockOutputStreamEntryPool.allocateBlockIfNeeded();
int currIdx = blockOutputStreamEntryPool
.getCurrentStreamEntry().getCurrentStreamIdx();
int bufferRem = ecChunkBufferCache.dataBuffers[currIdx].remaining();
final int writeLen = Math.min(len, Math.min(bufferRem, ecChunkSize));
int pos = ecChunkBufferCache.addToDataBuffer(currIdx, b, off, writeLen);
// if this cell is full, send data to the OutputStream
if (pos == ecChunkSize) {
handleOutputStreamWrite(currIdx, pos, false);
blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
// if this is last data cell in the stripe,
// compute and write the parity cells
if (currIdx == numDataBlks - 1) {
encodeAndWriteParityCells();
}
}
return writeLen;
}
private void handleOutputStreamWrite(int currIdx, int len, boolean isParity) {
ByteBuffer bytesToWrite = isParity ?
ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
ecChunkBufferCache.getDataBuffers()[currIdx];
try {
// Since it's a full cell, let's write all content from buffer.
// At a time we write max cell size in EC. So, it should safe to cast
// the len to int to use the super class defined write API.
// The len cannot be bigger than cell buffer size.
assert len <= ecChunkSize : " The len: " + len + ". EC chunk size: "
+ ecChunkSize;
assert len <= bytesToWrite
.limit() : " The len: " + len + ". Chunk buffer limit: "
+ bytesToWrite.limit();
writeToOutputStream(blockOutputStreamEntryPool.getCurrentStreamEntry(),
bytesToWrite.array(), len, 0, isParity);
} catch (Exception e) {
markStreamAsFailed(e);
}
}
private long writeToOutputStream(ECBlockOutputStreamEntry current,
byte[] b, int writeLen, int off, boolean isParity)
throws IOException {
try {
if (!isParity) {
// In case if exception while writing, this length will be updated back
// as part of handleStripeFailure.
offset += writeLen;
}
current.write(b, off, writeLen);
} catch (IOException ioe) {
LOG.debug(
"Exception while writing the cell buffers. The writeLen: " + writeLen
+ ". The block internal index is: "
+ current
.getCurrentStreamIdx(), ioe);
handleException(current, ioe);
}
return writeLen;
}
private void handleException(BlockOutputStreamEntry streamEntry,
IOException exception) throws IOException {
Throwable t = HddsClientUtils.checkForException(exception);
Preconditions.checkNotNull(t);
boolean containerExclusionException = checkIfContainerToExclude(t);
if (containerExclusionException) {
blockOutputStreamEntryPool.getExcludeList()
.addPipeline(streamEntry.getPipeline().getId());
}
markStreamAsFailed(exception);
}
private void markStreamClosed() {
blockOutputStreamEntryPool.cleanup();
closed = true;
}
private void markStreamAsFailed(Exception e) {
blockOutputStreamEntryPool.getCurrentStreamEntry().markFailed(e);
}
@Override
public void flush() {
LOG.debug("ECKeyOutputStream does not support flush.");
}
private void closeCurrentStreamEntry()
throws IOException {
if (!blockOutputStreamEntryPool.isEmpty()) {
while (true) {
try {
BlockOutputStreamEntry entry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
if (entry != null) {
try {
entry.close();
} catch (IOException ioe) {
handleException(entry, ioe);
continue;
}
}
return;
} catch (Exception e) {
markStreamClosed();
throw e;
}
}
}
}
/**
* Commit the key to OM, this will add the blocks as the new key blocks.
*
* @throws IOException
*/
@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
try {
// If stripe buffer is not empty, encode and flush the stripe.
if (ecChunkBufferCache.getFirstDataCell().position() > 0) {
final int index = blockOutputStreamEntryPool.getCurrentStreamEntry()
.getCurrentStreamIdx();
ByteBuffer lastCell = ecChunkBufferCache.getDataBuffers()[index];
// Finish writing the current partial cached chunk
if (lastCell.position() % ecChunkSize != 0) {
handleOutputStreamWrite(index, lastCell.position(), false);
}
encodeAndWriteParityCells();
}
closeCurrentStreamEntry();
Preconditions.checkArgument(writeOffset == offset,
"Expected writeOffset= " + writeOffset
+ " Expected offset=" + offset);
blockOutputStreamEntryPool.commitKey(offset);
} finally {
blockOutputStreamEntryPool.cleanup();
}
ecChunkBufferCache.release();
}
private void retryStripeWrite(int times) throws IOException {
for (int i = 0; i < times; i++) {
if (rewriteStripeToNewBlockGroup() == StripeWriteStatus.SUCCESS) {
return;
}
}
throw new IOException("Completed max allowed retries " +
times + " on stripe failures.");
}
public static void padBufferToLimit(ByteBuffer buf, int limit) {
int pos = buf.position();
if (pos >= limit) {
return;
}
Arrays.fill(buf.array(), pos, limit, (byte)0);
buf.position(limit);
}
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
return blockOutputStreamEntryPool.getCommitUploadPartInfo();
}
@VisibleForTesting
public ExcludeList getExcludeList() {
return blockOutputStreamEntryPool.getExcludeList();
}
/**
* Builder class of ECKeyOutputStream.
*/
public static class Builder extends KeyOutputStream.Builder {
private ECReplicationConfig replicationConfig;
private ByteBufferPool byteBufferPool;
@Override
public ECReplicationConfig getReplicationConfig() {
return replicationConfig;
}
public ECKeyOutputStream.Builder setReplicationConfig(
ECReplicationConfig replConfig) {
this.replicationConfig = replConfig;
return this;
}
public ByteBufferPool getByteBufferPool() {
return byteBufferPool;
}
public ECKeyOutputStream.Builder setByteBufferPool(
ByteBufferPool bufferPool) {
this.byteBufferPool = bufferPool;
return this;
}
@Override
public ECKeyOutputStream build() {
return new ECKeyOutputStream(this);
}
}
/**
* Verify that the output stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
*
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+ blockOutputStreamEntryPool.getKeyName());
}
}
private static class ECChunkBuffers {
private final ByteBuffer[] dataBuffers;
private final ByteBuffer[] parityBuffers;
private int cellSize;
private ByteBufferPool byteBufferPool;
ECChunkBuffers(int cellSize, int numData, int numParity,
ByteBufferPool byteBufferPool) {
this.cellSize = cellSize;
dataBuffers = new ByteBuffer[numData];
parityBuffers = new ByteBuffer[numParity];
this.byteBufferPool = byteBufferPool;
allocateBuffers(dataBuffers, this.cellSize);
allocateBuffers(parityBuffers, this.cellSize);
}
private ByteBuffer[] getDataBuffers() {
return dataBuffers;
}
private ByteBuffer[] getParityBuffers() {
return parityBuffers;
}
private ByteBuffer getFirstDataCell() {
return dataBuffers[0];
}
private ByteBuffer getLastDataCell() {
return dataBuffers[dataBuffers.length - 1];
}
private int addToDataBuffer(int i, byte[] b, int off, int len) {
final ByteBuffer buf = dataBuffers[i];
final int pos = buf.position() + len;
Preconditions.checkState(pos <= cellSize,
"Position(" + pos + ") is greater than the cellSize("
+ cellSize + ").");
buf.put(b, off, len);
return pos;
}
private void clear() {
clearBuffers(dataBuffers);
clearBuffers(parityBuffers);
}
private void release() {
releaseBuffers(dataBuffers);
releaseBuffers(parityBuffers);
}
private void allocateBuffers(ByteBuffer[] buffers, int bufferSize) {
for (int i = 0; i < buffers.length; i++) {
buffers[i] = byteBufferPool.getBuffer(false, cellSize);
buffers[i].limit(bufferSize);
}
}
private void clearBuffers(ByteBuffer[] buffers) {
for (int i = 0; i < buffers.length; i++) {
buffers[i].clear();
buffers[i].limit(cellSize);
}
}
private void releaseBuffers(ByteBuffer[] buffers) {
for (int i = 0; i < buffers.length; i++) {
if (buffers[i] != null) {
byteBufferPool.putBuffer(buffers[i]);
buffers[i] = null;
}
}
}
}
}