blob: 4cd2daa763d167aff30199bf6e77f8487247e999 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.runtime;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.serialization.types.ByteArrayType;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
* Tests the bug reported in FLINK-131O0.
*
* <p>The implementation of {@link org.apache.flink.runtime.io.network.partition.BoundedData.Reader#nextBuffer()}
* for {@link BoundedBlockingSubpartitionType#FILE} assumes that there is always an available buffer, otherwise
* an IOException is thrown and it always assumes that pool of two buffers is enough (before using the 3rd buffer,
* first one was expected to be recycled already). But in the case of pending flush operation (when the socket channel
* is not writable while netty thread is calling {@link ChannelHandlerContext#writeAndFlush(Object, ChannelPromise)}),
* the first fetched buffer from {@link org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not
* been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption.
*/
@Category(AlsoRunWithSchedulerNG.class)
public class FileBufferReaderITCase extends TestLogger {
private static final int parallelism = 8;
private static final int numRecords = 100_000;
private static final byte[] dataSource = new byte[1024];
@BeforeClass
public static void setup() {
for (int i = 0; i < dataSource.length; i++) {
dataSource[i] = 0;
}
}
@Test
public void testSequentialReading() throws Exception {
// setup
final Configuration configuration = new Configuration();
configuration.setString(RestOptions.BIND_PORT, "0");
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE, "file");
final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(parallelism)
.setNumSlotsPerTaskManager(1)
.build();
try (final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
miniCluster.start();
final MiniClusterClient client = new MiniClusterClient(configuration, miniCluster);
final JobGraph jobGraph = createJobGraph();
final CompletableFuture<JobSubmissionResult> submitFuture = client.submitJob(jobGraph);
// wait for the submission to succeed
final JobSubmissionResult result = submitFuture.get();
final CompletableFuture<JobResult> resultFuture = client.requestJobResult(result.getJobID());
final JobResult jobResult = resultFuture.get();
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
}
}
private static JobGraph createJobGraph() {
final SlotSharingGroup group1 = new SlotSharingGroup();
final SlotSharingGroup group2 = new SlotSharingGroup();
final JobVertex source = new JobVertex("source");
source.setInvokableClass(TestSourceInvokable.class);
source.setParallelism(parallelism);
source.setSlotSharingGroup(group1);
final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(TestSinkInvokable.class);
sink.setParallelism(parallelism);
sink.setSlotSharingGroup(group2);
sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
final JobGraph jobGraph = new JobGraph(source, sink);
jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
return jobGraph;
}
/**
* Basic source {@link AbstractInvokable} which sends the elements to the
* {@link TestSinkInvokable}.
*/
public static final class TestSourceInvokable extends AbstractInvokable {
/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public TestSourceInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
final RecordWriter<ByteArrayType> writer = new RecordWriterBuilder<ByteArrayType>().build(getEnvironment().getWriter(0));
final ByteArrayType bytes = new ByteArrayType(dataSource);
int counter = 0;
while (counter++ < numRecords) {
try {
writer.emit(bytes);
writer.flushAll();
} finally {
writer.clearBuffers();
}
}
}
}
/**
* Basic sink {@link AbstractInvokable} which verifies the sent elements
* from the {@link TestSourceInvokable}.
*/
public static final class TestSinkInvokable extends AbstractInvokable {
private int numReceived = 0;
/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public TestSinkInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
final RecordReader<ByteArrayType> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
ByteArrayType.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
while (reader.hasNext()) {
reader.next();
numReceived++;
}
assertThat(numReceived, is(numRecords));
}
}
}