blob: 397902f3d4ad6cab4162b9373c77f0c8598264a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* A nonEmptyReader of partition queues, which listens for channel writability changed events before
* writing and flushing {@link Buffer} instances.
*/
class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class);
private final ChannelFutureListener writeListener =
new WriteAndFlushNextMessageIfPossibleListener();
/** The readers which are already enqueued available for transferring data. */
private final ArrayDeque<NetworkSequenceViewReader> availableReaders = new ArrayDeque<>();
/** All the readers created for the consumers' partition requests. */
private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders =
new ConcurrentHashMap<>();
private boolean fatalError;
private ChannelHandlerContext ctx;
@Override
public void channelRegistered(final ChannelHandlerContext ctx) throws Exception {
if (this.ctx == null) {
this.ctx = ctx;
}
super.channelRegistered(ctx);
}
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
// The notification might come from the same thread. For the initial writes this
// might happen before the reader has set its reference to the view, because
// creating the queue and the initial notification happen in the same method call.
// This can be resolved by separating the creation of the view and allowing
// notifications.
// TODO This could potentially have a bad performance impact as in the
// worst case (network consumes faster than the producer) each buffer
// will trigger a separate event loop task being scheduled.
ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
}
/**
* Try to enqueue the reader once receiving credit notification from the consumer or receiving
* non-empty reader notification from the producer.
*
* <p>NOTE: Only one thread would trigger the actual enqueue after checking the reader's
* availability, so there is no race condition here.
*/
private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
if (reader.isRegisteredAsAvailable()) {
return;
}
ResultSubpartitionView.AvailabilityWithBacklog availabilityWithBacklog =
reader.getAvailabilityAndBacklog();
if (!availabilityWithBacklog.isAvailable()) {
int backlog = availabilityWithBacklog.getBacklog();
if (backlog > 0 && reader.needAnnounceBacklog()) {
announceBacklog(reader, backlog);
}
return;
}
// Queue an available reader for consumption. If the queue is empty,
// we try trigger the actual write. Otherwise this will be handled by
// the writeAndFlushNextMessageIfPossible calls.
boolean triggerWrite = availableReaders.isEmpty();
registerAvailableReader(reader);
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}
/**
* Accesses internal state to verify reader registration in the unit tests.
*
* <p><strong>Do not use anywhere else!</strong>
*
* @return readers which are enqueued available for transferring data
*/
@VisibleForTesting
ArrayDeque<NetworkSequenceViewReader> getAvailableReaders() {
return availableReaders;
}
public void notifyReaderCreated(final NetworkSequenceViewReader reader) {
allReaders.put(reader.getReceiverId(), reader);
}
public void cancel(InputChannelID receiverId) {
ctx.pipeline().fireUserEventTriggered(receiverId);
}
public void close() throws IOException {
if (ctx != null) {
ctx.channel().close();
}
releaseAllResources();
}
/**
* Adds unannounced credits from the consumer or resumes data consumption after an exactly-once
* checkpoint and enqueues the corresponding reader for this consumer (if not enqueued yet).
*
* @param receiverId The input channel id to identify the consumer.
* @param operation The operation to be performed (add credit or resume data consumption).
*/
void addCreditOrResumeConsumption(
InputChannelID receiverId, Consumer<NetworkSequenceViewReader> operation)
throws Exception {
if (fatalError) {
return;
}
NetworkSequenceViewReader reader = obtainReader(receiverId);
operation.accept(reader);
enqueueAvailableReader(reader);
}
void acknowledgeAllRecordsProcessed(InputChannelID receiverId) {
if (fatalError) {
return;
}
obtainReader(receiverId).acknowledgeAllRecordsProcessed();
}
void notifyNewBufferSize(InputChannelID receiverId, int newBufferSize) {
if (fatalError) {
return;
}
// It is possible to receive new buffer size before the reader would be created since the
// downstream task could calculate buffer size even using the data from one channel but it
// sends new buffer size into all upstream even if they don't ready yet. In this case, just
// ignore the new buffer size.
NetworkSequenceViewReader reader = allReaders.get(receiverId);
if (reader != null) {
reader.notifyNewBufferSize(newBufferSize);
}
}
NetworkSequenceViewReader obtainReader(InputChannelID receiverId) {
NetworkSequenceViewReader reader = allReaders.get(receiverId);
if (reader == null) {
throw new IllegalStateException(
"No reader for receiverId = " + receiverId + " exists.");
}
return reader;
}
/**
* Announces remaining backlog to the consumer after the available data notification or data
* consumption resumption.
*/
private void announceBacklog(NetworkSequenceViewReader reader, int backlog) {
checkArgument(backlog > 0, "Backlog must be positive.");
NettyMessage.BacklogAnnouncement announcement =
new NettyMessage.BacklogAnnouncement(backlog, reader.getReceiverId());
ctx.channel()
.writeAndFlush(announcement)
.addListener(
(ChannelFutureListener)
future -> {
if (!future.isSuccess()) {
onChannelFutureFailure(future);
}
});
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
// The user event triggered event loop callback is used for thread-safe
// hand over of reader queues and cancelled producers.
if (msg instanceof NetworkSequenceViewReader) {
enqueueAvailableReader((NetworkSequenceViewReader) msg);
} else if (msg.getClass() == InputChannelID.class) {
// Release partition view that get a cancel request.
InputChannelID toCancel = (InputChannelID) msg;
// remove reader from queue of available readers
availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));
// remove reader from queue of all readers and release its resource
final NetworkSequenceViewReader toRelease = allReaders.remove(toCancel);
if (toRelease != null) {
releaseViewReader(toRelease);
}
} else {
ctx.fireUserEventTriggered(msg);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
if (fatalError || !channel.isWritable()) {
return;
}
// The logic here is very similar to the combined input gate and local
// input channel logic. You can think of this class acting as the input
// gate and the consumed views as the local input channels.
BufferAndAvailability next = null;
try {
while (true) {
NetworkSequenceViewReader reader = pollAvailableReader();
// No queue with available data. We allow this here, because
// of the write callbacks that are executed after each write.
if (reader == null) {
return;
}
next = reader.getNextBuffer();
if (next == null) {
if (!reader.isReleased()) {
continue;
}
Throwable cause = reader.getFailureCause();
if (cause != null) {
ErrorResponse msg =
new ErrorResponse(
new ProducerFailedException(cause), reader.getReceiverId());
ctx.writeAndFlush(msg);
}
} else {
// This channel was now removed from the available reader queue.
// We re-add it into the queue if it is still available
if (next.moreAvailable()) {
registerAvailableReader(reader);
}
BufferResponse msg =
new BufferResponse(
next.buffer(),
next.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());
// Write and flush and wait until this is done before
// trying to continue with the next buffer.
channel.writeAndFlush(msg).addListener(writeListener);
return;
}
}
} catch (Throwable t) {
if (next != null) {
next.buffer().recycleBuffer();
}
throw new IOException(t.getMessage(), t);
}
}
private void registerAvailableReader(NetworkSequenceViewReader reader) {
availableReaders.add(reader);
reader.setRegisteredAsAvailable(true);
}
@Nullable
private NetworkSequenceViewReader pollAvailableReader() {
NetworkSequenceViewReader reader = availableReaders.poll();
if (reader != null) {
reader.setRegisteredAsAvailable(false);
}
return reader;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
releaseAllResources();
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
handleException(ctx.channel(), cause);
}
private void handleException(Channel channel, Throwable cause) throws IOException {
LOG.error("Encountered error while consuming partitions", cause);
fatalError = true;
releaseAllResources();
if (channel.isActive()) {
channel.writeAndFlush(new ErrorResponse(cause))
.addListener(ChannelFutureListener.CLOSE);
}
}
private void releaseAllResources() throws IOException {
// note: this is only ever executed by one thread: the Netty IO thread!
for (NetworkSequenceViewReader reader : allReaders.values()) {
releaseViewReader(reader);
}
availableReaders.clear();
allReaders.clear();
}
private void releaseViewReader(NetworkSequenceViewReader reader) throws IOException {
reader.setRegisteredAsAvailable(false);
reader.releaseAllResources();
}
private void onChannelFutureFailure(ChannelFuture future) throws Exception {
if (future.cause() != null) {
handleException(future.channel(), future.cause());
} else {
handleException(
future.channel(), new IllegalStateException("Sending cancelled by user."));
}
}
// This listener is called after an element of the current nonEmptyReader has been
// flushed. If successful, the listener triggers further processing of the
// queues.
private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try {
if (future.isSuccess()) {
writeAndFlushNextMessageIfPossible(future.channel());
} else {
onChannelFutureFailure(future);
}
} catch (Throwable t) {
handleException(future.channel(), t);
}
}
}
}