blob: e69f5b133b0d94d32988874e1035df0f445b05ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.types.IntValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/** Integration tests for reusing persisted intermediate dataset */
class JobIntermediateDatasetReuseTest {
private static final Logger LOG =
LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class);
@Test
void testClusterPartitionReuse() throws Exception {
internalTestClusterPartitionReuse(
1, 1, jobResult -> Assertions.assertThat(jobResult.isSuccess()).isTrue());
}
@Test
void testClusterPartitionReuseMultipleParallelism() throws Exception {
internalTestClusterPartitionReuse(
64, 64, jobResult -> Assertions.assertThat(jobResult.isSuccess()).isTrue());
}
@Test
void testClusterPartitionReuseWithMoreConsumerParallelismThrowException() throws Exception {
internalTestClusterPartitionReuse(
1,
2,
jobResult -> {
Assertions.assertThat(jobResult.isSuccess()).isFalse();
Assertions.assertThat(getClusterDatasetCorruptedException(jobResult))
.isNotNull();
});
}
@Test
void testClusterPartitionReuseWithLessConsumerParallelismThrowException() throws Exception {
internalTestClusterPartitionReuse(
2,
1,
jobResult -> {
Assertions.assertThat(jobResult.isSuccess()).isFalse();
Assertions.assertThat(getClusterDatasetCorruptedException(jobResult))
.isNotNull();
});
}
private void internalTestClusterPartitionReuse(
int producerParallelism,
int consumerParallelism,
Consumer<JobResult> jobResultVerification)
throws Exception {
final TestingMiniClusterConfiguration miniClusterConfiguration =
TestingMiniClusterConfiguration.newBuilder().build();
try (TestingMiniCluster miniCluster =
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
miniCluster.start();
IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
final JobGraph firstJobGraph =
createFirstJobGraph(producerParallelism, intermediateDataSetID);
miniCluster.submitJob(firstJobGraph).get();
CompletableFuture<JobResult> jobResultFuture =
miniCluster.requestJobResult(firstJobGraph.getJobID());
JobResult jobResult = jobResultFuture.get();
Assertions.assertThat(jobResult.isSuccess()).isTrue();
final JobGraph secondJobGraph =
createSecondJobGraph(consumerParallelism, intermediateDataSetID);
miniCluster.submitJob(secondJobGraph).get();
jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID());
jobResult = jobResultFuture.get();
jobResultVerification.accept(jobResult);
}
}
@Test
void testClusterPartitionReuseWithTMFail() throws Exception {
final TestingMiniClusterConfiguration miniClusterConfiguration =
TestingMiniClusterConfiguration.newBuilder().build();
try (TestingMiniCluster miniCluster =
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
miniCluster.start();
IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
final JobGraph firstJobGraph = createFirstJobGraph(1, intermediateDataSetID);
miniCluster.submitJob(firstJobGraph).get();
CompletableFuture<JobResult> jobResultFuture =
miniCluster.requestJobResult(firstJobGraph.getJobID());
JobResult jobResult = jobResultFuture.get();
Assertions.assertThat(jobResult.isSuccess()).isTrue();
miniCluster.terminateTaskManager(0);
miniCluster.startTaskManager();
final JobGraph secondJobGraph = createSecondJobGraph(1, intermediateDataSetID);
final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1024, 1000));
secondJobGraph.setExecutionConfig(executionConfig);
miniCluster.submitJob(secondJobGraph).get();
jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID());
jobResult = jobResultFuture.get();
Assertions.assertThat(jobResult.isSuccess()).isFalse();
final ClusterDatasetCorruptedException exception =
getClusterDatasetCorruptedException(jobResult);
Assertions.assertThat(exception).isNotNull();
Assertions.assertThat(exception.getCorruptedClusterDatasetIds().get(0))
.isEqualTo(intermediateDataSetID);
firstJobGraph.setJobID(new JobID());
miniCluster.submitJob(firstJobGraph).get();
jobResultFuture = miniCluster.requestJobResult(firstJobGraph.getJobID());
jobResult = jobResultFuture.get();
Assertions.assertThat(jobResult.isSuccess()).isTrue();
secondJobGraph.setJobID(new JobID());
miniCluster.submitJob(secondJobGraph).get();
jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID());
jobResult = jobResultFuture.get();
Assertions.assertThat(jobResult.isSuccess()).isTrue();
}
}
private ClusterDatasetCorruptedException getClusterDatasetCorruptedException(
JobResult jobResult) {
Assertions.assertThat(jobResult.getSerializedThrowable().isPresent()).isTrue();
Throwable throwable =
jobResult
.getSerializedThrowable()
.get()
.deserializeError(Thread.currentThread().getContextClassLoader());
while (throwable != null) {
if (throwable instanceof ClusterDatasetCorruptedException) {
return (ClusterDatasetCorruptedException) throwable;
}
throwable = throwable.getCause();
}
return null;
}
private JobGraph createSecondJobGraph(
int parallelism, IntermediateDataSetID intermediateDataSetID) {
final JobVertex receiver = new JobVertex("Receiver 2", null);
receiver.setParallelism(parallelism);
receiver.setInvokableClass(Receiver.class);
receiver.addIntermediateDataSetIdToConsume(intermediateDataSetID);
return new JobGraph(null, "Second Job", receiver);
}
private JobGraph createFirstJobGraph(
int parallelism, IntermediateDataSetID intermediateDataSetID) {
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(parallelism);
sender.setInvokableClass(Sender.class);
final JobVertex receiver = new JobVertex("Receiver");
receiver.setParallelism(parallelism);
receiver.setInvokableClass(Receiver.class);
receiver.connectNewDataSetAsInput(
sender,
DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING_PERSISTENT,
intermediateDataSetID,
false);
return new JobGraph(null, "First Job", sender, receiver);
}
/**
* Basic sender {@link AbstractInvokable} which sends 100 record base on its index to down
* stream.
*/
public static class Sender extends AbstractInvokable {
public Sender(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
int index = getIndexInSubtaskGroup();
final RecordWriter<IntValue> writer =
new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
try {
for (int i = index; i < index + 100; ++i) {
writer.emit(new IntValue(i));
LOG.debug("Sender({}) emit {}", index, i);
}
writer.flushAll();
} finally {
writer.close();
}
}
}
/**
* Basic receiver {@link AbstractInvokable} which verifies the sent elements from the {@link
* Sender}.
*/
public static class Receiver extends AbstractInvokable {
public Receiver(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
int index = getIndexInSubtaskGroup();
final RecordReader<IntValue> reader =
new RecordReader<>(
getEnvironment().getInputGate(0),
IntValue.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
for (int i = index; i < index + 100; ++i) {
final int value = reader.next().getValue();
LOG.debug("Receiver({}) received {}", index, value);
Assertions.assertThat(value).isEqualTo(i);
}
Assertions.assertThat(reader.next()).isNull();
}
}
}