blob: 50db0eec40228e191a5535a2485714723b365b9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CleanupOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.TestingRetryStrategies;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertTrue;
/** An integration test for various fail-over scenarios of the {@link Dispatcher} component. */
public class DispatcherCleanupITCase extends AbstractDispatcherTest {
private final BlockingQueue<RpcEndpoint> toTerminate = new LinkedBlockingQueue<>();
@Before
public void setUp() throws Exception {
super.setUp();
haServices.setCheckpointRecoveryFactory(
new PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
(maxCheckpoints,
previous,
sharedStateRegistryFactory,
ioExecutor,
restoreMode) -> {
if (previous != null) {
// First job cleanup still succeeded for the
// CompletedCheckpointStore because the JobGraph cleanup happens
// after the JobManagerRunner closing
assertTrue(previous.getShutdownStatus().isPresent());
assertTrue(previous.getAllCheckpoints().isEmpty());
return new EmbeddedCompletedCheckpointStore(
maxCheckpoints,
previous.getAllCheckpoints(),
sharedStateRegistryFactory.create(
ioExecutor,
previous.getAllCheckpoints(),
restoreMode));
}
return new EmbeddedCompletedCheckpointStore(
maxCheckpoints,
Collections.emptyList(),
sharedStateRegistryFactory.create(
ioExecutor,
Collections.emptyList(),
RestoreMode.DEFAULT));
}));
}
@After
public void tearDown() {
while (!toTerminate.isEmpty()) {
final RpcEndpoint endpoint = toTerminate.poll();
try {
RpcUtils.terminateRpcEndpoint(endpoint);
} catch (Exception e) {
// Ignore.
}
}
}
@Test
public void testCleanupThroughRetries() throws Exception {
final JobGraph jobGraph = createJobGraph();
final JobID jobId = jobGraph.getJobID();
// JobGraphStore
final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
final OneShotLatch successfulCleanupLatch = new OneShotLatch();
final int numberOfErrors = 5;
final RuntimeException temporaryError =
new RuntimeException("Expected RuntimeException: Unable to remove job graph.");
final AtomicInteger failureCount = new AtomicInteger(numberOfErrors);
final JobGraphStore jobGraphStore =
TestingJobGraphStore.newBuilder()
.setGlobalCleanupFunction(
(ignoredJobId, ignoredExecutor) -> {
actualGlobalCleanupCallCount.incrementAndGet();
if (failureCount.getAndDecrement() > 0) {
return FutureUtils.completedExceptionally(temporaryError);
}
successfulCleanupLatch.trigger();
return FutureUtils.completedVoidFuture();
})
.build();
jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
haServices.setJobGraphStore(jobGraphStore);
// Construct leader election service.
final TestingLeaderElectionService leaderElectionService =
new TestingLeaderElectionService();
haServices.setJobMasterLeaderElectionService(jobId, leaderElectionService);
// start the dispatcher with enough retries on cleanup
final JobManagerRunnerRegistry jobManagerRunnerRegistry =
new DefaultJobManagerRunnerRegistry(2);
final Dispatcher dispatcher =
createTestingDispatcherBuilder()
.setResourceCleanerFactory(
new DispatcherResourceCleanerFactory(
ForkJoinPool.commonPool(),
TestingRetryStrategies.createWithNumberOfRetries(
numberOfErrors),
jobManagerRunnerRegistry,
haServices.getJobGraphStore(),
blobServer,
haServices,
UnregisteredMetricGroups
.createUnregisteredJobManagerMetricGroup()))
.build(rpcService);
dispatcher.start();
toTerminate.add(dispatcher);
leaderElectionService.isLeader(UUID.randomUUID());
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
successfulCleanupLatch.await();
assertThat(actualGlobalCleanupCallCount.get(), equalTo(numberOfErrors + 1));
assertThat(
"The JobGraph should be removed from JobGraphStore.",
haServices.getJobGraphStore().getJobIds(),
IsEmptyCollection.empty());
CommonTestUtils.waitUntilCondition(
() -> haServices.getJobResultStore().hasJobResultEntry(jobId));
}
@Test
public void testCleanupNotCancellable() throws Exception {
final JobGraph jobGraph = createJobGraph();
final JobID jobId = jobGraph.getJobID();
final JobResultStore jobResultStore = new EmbeddedJobResultStore();
jobResultStore.createDirtyResult(
new JobResultEntry(TestingJobResultStore.createSuccessfulJobResult(jobId)));
haServices.setJobResultStore(jobResultStore);
// Instantiates JobManagerRunner
final CompletableFuture<Void> jobManagerRunnerCleanupFuture = new CompletableFuture<>();
final AtomicReference<JobManagerRunner> jobManagerRunnerEntry = new AtomicReference<>();
final JobManagerRunnerRegistry jobManagerRunnerRegistry =
TestingJobManagerRunnerRegistry.newSingleJobBuilder(jobManagerRunnerEntry)
.withLocalCleanupAsyncFunction(
(actualJobId, executor) -> jobManagerRunnerCleanupFuture)
.build();
final Dispatcher dispatcher =
createTestingDispatcherBuilder()
.setJobManagerRunnerRegistry(jobManagerRunnerRegistry)
.build(rpcService);
dispatcher.start();
toTerminate.add(dispatcher);
CommonTestUtils.waitUntilCondition(() -> jobManagerRunnerEntry.get() != null);
assertThat(
"The JobResultStore should have this job still marked as dirty.",
haServices.getJobResultStore().hasDirtyJobResultEntry(jobId),
CoreMatchers.is(true));
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
try {
dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
Assert.fail("Should fail because cancelling the cleanup is not allowed.");
} catch (ExecutionException e) {
assertThat(e, FlinkMatchers.containsCause(JobCancellationFailedException.class));
}
jobManagerRunnerCleanupFuture.complete(null);
CommonTestUtils.waitUntilCondition(
() -> haServices.getJobResultStore().hasCleanJobResultEntry(jobId));
}
@Test
public void testCleanupAfterLeadershipChange() throws Exception {
final JobGraph jobGraph = createJobGraph();
final JobID jobId = jobGraph.getJobID();
// Construct job graph store.
final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
final OneShotLatch firstCleanupTriggered = new OneShotLatch();
final CompletableFuture<JobID> successfulJobGraphCleanup = new CompletableFuture<>();
final JobGraphStore jobGraphStore =
TestingJobGraphStore.newBuilder()
.setGlobalCleanupFunction(
(actualJobId, ignoredExecutor) -> {
final int callCount =
actualGlobalCleanupCallCount.getAndIncrement();
firstCleanupTriggered.trigger();
if (callCount < 1) {
return FutureUtils.completedExceptionally(
new RuntimeException(
"Expected RuntimeException: Unable to remove job graph."));
}
successfulJobGraphCleanup.complete(actualJobId);
return FutureUtils.completedVoidFuture();
})
.build();
jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
haServices.setJobGraphStore(jobGraphStore);
// Construct leader election service.
final TestingLeaderElectionService leaderElectionService =
new TestingLeaderElectionService();
haServices.setJobMasterLeaderElectionService(jobId, leaderElectionService);
// start the dispatcher with no retries on cleanup
configuration.set(
CleanupOptions.CLEANUP_STRATEGY,
CleanupOptions.NONE_PARAM_VALUES.iterator().next());
final Dispatcher dispatcher = createTestingDispatcherBuilder().build(rpcService);
dispatcher.start();
toTerminate.add(dispatcher);
leaderElectionService.isLeader(UUID.randomUUID());
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
firstCleanupTriggered.await();
assertThat(
"The cleanup should have been triggered only once.",
actualGlobalCleanupCallCount.get(),
equalTo(1));
assertThat(
"The cleanup should not have reached the successful cleanup code path.",
successfulJobGraphCleanup.isDone(),
equalTo(false));
assertThat(
"The JobGraph is still stored in the JobGraphStore.",
haServices.getJobGraphStore().getJobIds(),
equalTo(Collections.singleton(jobId)));
assertThat(
"The JobResultStore should have this job marked as dirty.",
haServices.getJobResultStore().getDirtyResults().stream()
.map(JobResult::getJobId)
.collect(Collectors.toSet()),
equalTo(Collections.singleton(jobId)));
// Run a second dispatcher, that restores our finished job.
final Dispatcher secondDispatcher =
createTestingDispatcherBuilder()
.setRecoveredDirtyJobs(haServices.getJobResultStore().getDirtyResults())
.build(rpcService);
secondDispatcher.start();
toTerminate.add(secondDispatcher);
leaderElectionService.isLeader(UUID.randomUUID());
CommonTestUtils.waitUntilCondition(
() -> haServices.getJobResultStore().getDirtyResults().isEmpty());
assertThat(
"The JobGraph is not stored in the JobGraphStore.",
haServices.getJobGraphStore().getJobIds(),
IsEmptyCollection.empty());
assertTrue(
"The JobResultStore has the job listed as clean.",
haServices.getJobResultStore().hasJobResultEntry(jobId));
assertThat(successfulJobGraphCleanup.get(), equalTo(jobId));
assertThat(actualGlobalCleanupCallCount.get(), equalTo(2));
}
private void waitForJobToFinish(
TestingLeaderElectionService leaderElectionService,
DispatcherGateway dispatcherGateway,
JobID jobId)
throws Exception {
final JobMasterGateway jobMasterGateway =
connectToLeadingJobMaster(leaderElectionService).get();
try (final JobMasterTester tester =
new JobMasterTester(rpcService, jobId, jobMasterGateway)) {
final CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture =
tester.deployVertices(2);
awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
tester.transitionTo(descriptorsFuture.get(), ExecutionState.INITIALIZING).get();
tester.transitionTo(descriptorsFuture.get(), ExecutionState.RUNNING).get();
tester.getCheckpointFuture(1L).get();
tester.transitionTo(descriptorsFuture.get(), ExecutionState.FINISHED).get();
}
awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
}
private JobGraph createJobGraph() {
final JobVertex firstVertex = new JobVertex("first");
firstVertex.setInvokableClass(NoOpInvokable.class);
firstVertex.setParallelism(1);
final JobVertex secondVertex = new JobVertex("second");
secondVertex.setInvokableClass(NoOpInvokable.class);
secondVertex.setParallelism(1);
final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration =
CheckpointCoordinatorConfiguration.builder()
.setCheckpointInterval(20L)
.setMinPauseBetweenCheckpoints(20L)
.setCheckpointTimeout(10_000L)
.build();
final JobCheckpointingSettings checkpointingSettings =
new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
return JobGraphBuilder.newStreamingJobGraphBuilder()
.addJobVertex(firstVertex)
.addJobVertex(secondVertex)
.setJobCheckpointingSettings(checkpointingSettings)
.build();
}
private static CompletableFuture<JobMasterGateway> connectToLeadingJobMaster(
TestingLeaderElectionService leaderElectionService) {
return leaderElectionService
.getConfirmationFuture()
.thenCompose(
leaderConnectionInfo ->
rpcService.connect(
leaderConnectionInfo.getAddress(),
JobMasterId.fromUuidOrNull(
leaderConnectionInfo.getLeaderSessionId()),
JobMasterGateway.class));
}
}