blob: 154ae296cc7b75cc6b026220aaf04abff4c3ebd8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
/**
* An asynchronous HDFS output stream implementation which fans out data to datanode and only
* supports writing file with only one block.
* <p>
* Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
* usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
* method. And we place it here under io package because we want to make it independent of WAL
* implementation thus easier to move it to HDFS project finally.
* <p>
* Note that, although we support pipelined flush, i.e, write new data and then flush before the
* previous flush succeeds, the implementation is not thread safe, so you should not call its
* methods concurrently.
* <p>
* Advantages compare to DFSOutputStream:
* <ol>
* <li>The fan out mechanism. This will reduce the latency.</li>
* <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
* ASAP.</li>
* <li>We could benefit from netty's ByteBuf management mechanism.</li>
* </ol>
*/
@InterfaceAudience.Private
public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a
// smaller limit for data size.
private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
private final Configuration conf;
private final DistributedFileSystem dfs;
private final DFSClient client;
private final ClientProtocol namenode;
private final String clientName;
private final String src;
private final long fileId;
private final ExtendedBlock block;
private final DatanodeInfo[] locations;
private final Encryptor encryptor;
private final List<Channel> datanodeList;
private final DataChecksum summer;
private final int maxDataLen;
private final ByteBufAllocator alloc;
private static final class Callback {
private final CompletableFuture<Long> future;
private final long ackedLength;
// should be backed by a thread safe collection
private final Set<ChannelId> unfinishedReplicas;
public Callback(CompletableFuture<Long> future, long ackedLength,
Collection<Channel> replicas) {
this.future = future;
this.ackedLength = ackedLength;
if (replicas.isEmpty()) {
this.unfinishedReplicas = Collections.emptySet();
} else {
this.unfinishedReplicas =
Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
}
}
}
private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
private volatile long ackedBlockLength = 0L;
// this could be different from acked block length because a packet can not start at the middle of
// a chunk.
private long nextPacketOffsetInBlock = 0L;
// the length of the trailing partial chunk, this is because the packet start offset must be
// aligned with the length of checksum chunk so we need to resend the same data.
private int trailingPartialChunkLength = 0;
private long nextPacketSeqno = 0L;
private ByteBuf buf;
private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
// State for connections to DN
private enum State {
STREAMING, CLOSING, BROKEN, CLOSED
}
private volatile State state;
// all lock-free to make it run faster
private void completed(Channel channel) {
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
Callback c = iter.next();
// if the current unfinished replicas does not contain us then it means that we have already
// acked this one, let's iterate to find the one we have not acked yet.
if (c.unfinishedReplicas.remove(channel.id())) {
if (c.unfinishedReplicas.isEmpty()) {
// we need to remove first before complete the future. It is possible that after we
// complete the future the upper layer will call close immediately before we remove the
// entry from waitingAckQueue and lead to an IllegalStateException. And also set the
// ackedBlockLength first otherwise we may use a wrong length to commit the block. This
// may lead to multiple remove and assign but is OK. The semantic of iter.remove is
// removing the entry returned by calling previous next, so if the entry has already been
// removed then it is a no-op, and for the assign, the values are the same so no problem.
iter.remove();
ackedBlockLength = c.ackedLength;
// the future.complete check is to confirm that we are the only one who grabbed the work,
// otherwise just give up and return.
if (c.future.complete(c.ackedLength)) {
// also wake up flush requests which have the same length.
while (iter.hasNext()) {
Callback maybeDummyCb = iter.next();
if (maybeDummyCb.ackedLength == c.ackedLength) {
iter.remove();
maybeDummyCb.future.complete(c.ackedLength);
} else {
break;
}
}
}
}
return;
}
}
}
// this usually does not happen which means it is not on the critical path so make it synchronized
// so that the implementation will not burn up our brain as there are multiple state changes and
// checks.
private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
if (state == State.BROKEN || state == State.CLOSED) {
return;
}
if (state == State.CLOSING) {
Callback c = waitingAckQueue.peekFirst();
if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
// nothing, the endBlock request has already finished.
return;
}
}
// disable further write, and fail all pending ack.
state = State.BROKEN;
Throwable error = errorSupplier.get();
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
Callback c = iter.next();
// find the first sync request which we have not acked yet and fail all the request after it.
if (!c.unfinishedReplicas.contains(channel.id())) {
continue;
}
for (;;) {
c.future.completeExceptionally(error);
if (!iter.hasNext()) {
break;
}
c = iter.next();
}
break;
}
datanodeList.forEach(ch -> ch.close());
}
@Sharable
private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> {
private final int timeoutMs;
public AckHandler(int timeoutMs) {
this.timeoutMs = timeoutMs;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
Status reply = getStatus(ack);
if (reply != Status.SUCCESS) {
failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " +
block + " from datanode " + ctx.channel().remoteAddress()));
return;
}
if (PipelineAck.isRestartOOBStatus(reply)) {
failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " +
block + " from datanode " + ctx.channel().remoteAddress()));
return;
}
if (ack.getSeqno() == HEART_BEAT_SEQNO) {
return;
}
completed(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (state == State.CLOSED) {
return;
}
failed(ctx.channel(),
() -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
failed(ctx.channel(), () -> cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == READER_IDLE) {
failed(ctx.channel(),
() -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
} else if (e.state() == WRITER_IDLE) {
PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
int len = heartbeat.getSerializedSize();
ByteBuf buf = alloc.buffer(len);
heartbeat.putInBuffer(buf.nioBuffer(0, len));
buf.writerIndex(len);
ctx.channel().writeAndFlush(buf);
}
return;
}
super.userEventTriggered(ctx, evt);
}
}
private void setupReceiver(int timeoutMs) {
AckHandler ackHandler = new AckHandler(timeoutMs);
for (Channel ch : datanodeList) {
ch.pipeline().addLast(
new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
new ProtobufVarint32FrameDecoder(),
new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
ch.config().setAutoRead(true);
}
}
FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
DataChecksum summer, ByteBufAllocator alloc) {
this.conf = conf;
this.dfs = dfs;
this.client = client;
this.namenode = namenode;
this.fileId = fileId;
this.clientName = clientName;
this.src = src;
this.block = locatedBlock.getBlock();
this.locations = locatedBlock.getLocations();
this.encryptor = encryptor;
this.datanodeList = datanodeList;
this.summer = summer;
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
this.alloc = alloc;
this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
}
@Override
public void writeInt(int i) {
buf.ensureWritable(4);
buf.writeInt(i);
}
@Override
public void write(ByteBuffer bb) {
buf.ensureWritable(bb.remaining());
buf.writeBytes(bb);
}
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) {
buf.ensureWritable(len);
buf.writeBytes(b, off, len);
}
@Override
public int buffered() {
return buf.readableBytes();
}
@Override
public DatanodeInfo[] getPipeline() {
return locations;
}
private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
long nextPacketOffsetInBlock, boolean syncBlock) {
int dataLen = dataBuf.readableBytes();
int chunkLen = summer.getBytesPerChecksum();
int trailingPartialChunkLen = dataLen % chunkLen;
int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
int checksumLen = numChecks * summer.getChecksumSize();
ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
checksumBuf.writerIndex(checksumLen);
PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
nextPacketSeqno, false, dataLen, syncBlock);
int headerLen = header.getSerializedSize();
ByteBuf headerBuf = alloc.buffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
waitingAckQueue.addLast(c);
// recheck again after we pushed the callback to queue
if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
future.completeExceptionally(new IOException("stream already broken"));
// it's the one we have just pushed or just a no-op
waitingAckQueue.removeFirst();
return;
}
// TODO: we should perhaps measure time taken per DN here;
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
datanodeList.forEach(ch -> {
ch.write(headerBuf.retainedDuplicate());
ch.write(checksumBuf.retainedDuplicate());
ch.writeAndFlush(dataBuf.retainedDuplicate());
});
checksumBuf.release();
headerBuf.release();
dataBuf.release();
nextPacketSeqno++;
}
private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
if (state != State.STREAMING) {
future.completeExceptionally(new IOException("stream already broken"));
return;
}
int dataLen = buf.readableBytes();
if (dataLen == trailingPartialChunkLength) {
// no new data
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
Callback lastFlush = waitingAckQueue.peekLast();
if (lastFlush != null) {
Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
waitingAckQueue.addLast(c);
// recheck here if we have already removed the previous callback from the queue
if (waitingAckQueue.peekFirst() == c) {
// all previous callbacks have been removed
// notice that this does mean we will always win here because the background thread may
// have already started to mark the future here as completed in the completed or failed
// methods but haven't removed it from the queue yet. That's also why the removeFirst
// call below may be a no-op.
if (state != State.STREAMING) {
future.completeExceptionally(new IOException("stream already broken"));
} else {
future.complete(lengthAfterFlush);
}
// it's the one we have just pushed or just a no-op
waitingAckQueue.removeFirst();
}
} else {
// we must have acked all the data so the ackedBlockLength must be same with
// lengthAfterFlush
future.complete(lengthAfterFlush);
}
return;
}
if (encryptor != null) {
ByteBuf encryptBuf = alloc.directBuffer(dataLen);
buf.readBytes(encryptBuf, trailingPartialChunkLength);
int toEncryptLength = dataLen - trailingPartialChunkLength;
try {
encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
} catch (IOException e) {
encryptBuf.release();
future.completeExceptionally(e);
return;
}
encryptBuf.writerIndex(dataLen);
buf.release();
buf = encryptBuf;
}
if (dataLen > maxDataLen) {
// We need to write out the data by multiple packets as the max packet allowed is 16M.
long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
for (int remaining = dataLen;;) {
if (remaining < maxDataLen) {
flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
syncBlock);
break;
} else {
flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
nextSubPacketOffsetInBlock, syncBlock);
remaining -= maxDataLen;
nextSubPacketOffsetInBlock += maxDataLen;
}
}
} else {
flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
}
trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
.ensureWritable(trailingPartialChunkLength);
if (trailingPartialChunkLength != 0) {
buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
trailingPartialChunkLength);
}
buf.release();
this.buf = newBuf;
nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
}
/**
* Flush the buffer out to datanodes.
* @param syncBlock will call hsync if true, otherwise hflush.
* @return A CompletableFuture that hold the acked length after flushing.
*/
@Override
public CompletableFuture<Long> flush(boolean syncBlock) {
CompletableFuture<Long> future = new CompletableFuture<>();
flush0(future, syncBlock);
return future;
}
private void endBlock() throws IOException {
Preconditions.checkState(waitingAckQueue.isEmpty(),
"should call flush first before calling close");
if (state != State.STREAMING) {
throw new IOException("stream already broken");
}
state = State.CLOSING;
long finalizedLength = ackedBlockLength;
PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
buf.release();
buf = null;
int headerLen = header.getSerializedSize();
ByteBuf headerBuf = alloc.directBuffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
CompletableFuture<Long> future = new CompletableFuture<>();
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
headerBuf.release();
try {
future.get();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
throw new IOException(cause);
}
}
/**
* The close method when error occurred. Now we just call recoverFileLease.
*/
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
if (buf != null) {
buf.release();
buf = null;
}
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
/**
* End the current block and complete file at namenode. You should call
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
*/
@Override
public void close() throws IOException {
endBlock();
state = State.CLOSED;
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId);
}
@Override
public boolean isBroken() {
return state == State.BROKEN;
}
}