blob: b0fcf420df377ad0d76086426a5537b234af2756 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
import org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/** Tests for {@link PartitionRequestServerHandler}. */
public class PartitionRequestServerHandlerTest extends TestLogger {
/**
* Tests that {@link PartitionRequestServerHandler} responds {@link ErrorResponse} with wrapped
* {@link PartitionNotFoundException} after receiving invalid {@link PartitionRequest}.
*/
@Test
public void testResponsePartitionNotFoundException() {
final PartitionRequestServerHandler serverHandler =
new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
new PartitionRequestQueue());
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
final ResultPartitionID partitionId = new ResultPartitionID();
// Write the message of partition request to server
channel.writeInbound(new PartitionRequest(partitionId, 0, new InputChannelID(), 2));
channel.runPendingTasks();
// Read the response message after handling partition request
final Object msg = channel.readOutbound();
assertThat(msg, instanceOf(ErrorResponse.class));
final ErrorResponse err = (ErrorResponse) msg;
assertThat(err.cause, instanceOf(PartitionNotFoundException.class));
final ResultPartitionID actualPartitionId =
((PartitionNotFoundException) err.cause).getPartitionId();
assertThat(partitionId, is(actualPartitionId));
}
@Test
public void testResumeConsumption() {
final InputChannelID inputChannelID = new InputChannelID();
final PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
final TestViewReader testViewReader =
new TestViewReader(inputChannelID, 2, partitionRequestQueue);
final PartitionRequestServerHandler serverHandler =
new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
partitionRequestQueue);
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
partitionRequestQueue.notifyReaderCreated(testViewReader);
// Write the message of resume consumption to server
channel.writeInbound(new ResumeConsumption(inputChannelID));
channel.runPendingTasks();
assertTrue(testViewReader.consumptionResumed);
}
@Test
public void testAcknowledgeAllRecordsProcessed() throws IOException {
InputChannelID inputChannelID = new InputChannelID();
ResultPartition resultPartition =
PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) ->
resultPartition.createSubpartitionView(index, availabilityListener);
// Creates the netty network handler stack.
PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
final PartitionRequestServerHandler serverHandler =
new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
partitionRequestQueue);
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler, partitionRequestQueue);
// Creates and registers the view to netty.
NetworkSequenceViewReader viewReader =
new CreditBasedSequenceNumberingViewReader(
inputChannelID, 2, partitionRequestQueue);
viewReader.requestSubpartitionView(partitionProvider, resultPartition.getPartitionId(), 0);
partitionRequestQueue.notifyReaderCreated(viewReader);
// Write the message to acknowledge all records are processed to server
resultPartition.notifyEndOfData();
CompletableFuture<Void> allRecordsProcessedFuture =
resultPartition.getAllDataProcessedFuture();
assertFalse(allRecordsProcessedFuture.isDone());
channel.writeInbound(new NettyMessage.AckAllUserRecordsProcessed(inputChannelID));
channel.runPendingTasks();
assertTrue(allRecordsProcessedFuture.isDone());
assertFalse(allRecordsProcessedFuture.isCompletedExceptionally());
}
@Test
public void testNewBufferSize() {
final InputChannelID inputChannelID = new InputChannelID();
final PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
final TestViewReader testViewReader =
new TestViewReader(inputChannelID, 2, partitionRequestQueue);
final PartitionRequestServerHandler serverHandler =
new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
partitionRequestQueue);
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
partitionRequestQueue.notifyReaderCreated(testViewReader);
// Write the message of new buffer size to server
channel.writeInbound(new NettyMessage.NewBufferSize(666, inputChannelID));
channel.runPendingTasks();
assertEquals(666, testViewReader.bufferSize);
}
@Test
public void testReceivingNewBufferSizeBeforeReaderIsCreated() {
final InputChannelID inputChannelID = new InputChannelID();
final PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
final TestViewReader testViewReader =
new TestViewReader(inputChannelID, 2, partitionRequestQueue);
final PartitionRequestServerHandler serverHandler =
new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
partitionRequestQueue);
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
// Write the message of new buffer size to server without prepared reader.
channel.writeInbound(new NettyMessage.NewBufferSize(666, inputChannelID));
channel.runPendingTasks();
// If error happens outbound messages would be not empty.
assertTrue(channel.outboundMessages().toString(), channel.outboundMessages().isEmpty());
// New buffer size should be silently ignored because it is possible situation.
assertEquals(-1, testViewReader.bufferSize);
}
private static class TestViewReader extends CreditBasedSequenceNumberingViewReader {
private boolean consumptionResumed = false;
private int bufferSize = -1;
TestViewReader(
InputChannelID receiverId, int initialCredit, PartitionRequestQueue requestQueue) {
super(receiverId, initialCredit, requestQueue);
}
@Override
public void resumeConsumption() {
consumptionResumed = true;
}
@Override
public void notifyNewBufferSize(int newBufferSize) {
bufferSize = newBufferSize;
}
}
}