blob: ac4693565e4594835165b626336aacb5091b49c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.runtime.dispatcher.AbstractDispatcherTest.awaitStatus;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
/** Tests the resource cleanup by the {@link Dispatcher}. */
public class DispatcherResourceCleanupTest extends TestLogger {
@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule public ExpectedException expectedException = ExpectedException.none();
@Rule
public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource =
new TestingFatalErrorHandlerResource();
private static final Time timeout = Time.seconds(10L);
private static TestingRpcService rpcService;
private JobID jobId;
private JobGraph jobGraph;
private TestingDispatcher dispatcher;
private DispatcherGateway dispatcherGateway;
private BlobServer blobServer;
private CompletableFuture<JobID> localCleanupFuture;
private CompletableFuture<JobID> globalCleanupFuture;
@BeforeClass
public static void setupClass() {
rpcService = new TestingRpcService();
}
@Before
public void setup() throws Exception {
jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
jobId = jobGraph.getJobID();
globalCleanupFuture = new CompletableFuture<>();
localCleanupFuture = new CompletableFuture<>();
blobServer =
BlobUtils.createBlobServer(
new Configuration(),
Reference.owned(temporaryFolder.newFolder()),
new TestingBlobStoreBuilder().createTestingBlobStore());
}
private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() throws Exception {
return startDispatcherAndSubmitJob(0);
}
private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(
int numBlockingJobManagerRunners) throws Exception {
return startDispatcherAndSubmitJob(
createTestingDispatcherBuilder(), numBlockingJobManagerRunners);
}
private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(
TestingDispatcher.Builder dispatcherBuilder, int numBlockingJobManagerRunners)
throws Exception {
final TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactoryNG =
new TestingJobMasterServiceLeadershipRunnerFactory(numBlockingJobManagerRunners);
startDispatcher(dispatcherBuilder, testingJobManagerRunnerFactoryNG);
submitJobAndWait();
return testingJobManagerRunnerFactoryNG;
}
private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
startDispatcher(createTestingDispatcherBuilder(), jobManagerRunnerFactory);
}
private void startDispatcher(
TestingDispatcher.Builder dispatcherBuilder,
JobManagerRunnerFactory jobManagerRunnerFactory)
throws Exception {
dispatcher =
dispatcherBuilder
.setJobManagerRunnerFactory(jobManagerRunnerFactory)
.build(rpcService);
dispatcher.start();
dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
}
private TestingDispatcher.Builder createTestingDispatcherBuilder() {
final JobManagerRunnerRegistry jobManagerRunnerRegistry =
new DefaultJobManagerRunnerRegistry(2);
return TestingDispatcher.builder()
.setBlobServer(blobServer)
.setJobManagerRunnerRegistry(jobManagerRunnerRegistry)
.setFatalErrorHandler(testingFatalErrorHandlerResource.getFatalErrorHandler())
.setResourceCleanerFactory(
TestingResourceCleanerFactory.builder()
// JobManagerRunnerRegistry needs to be added explicitly
// because cleaning it will trigger the closeAsync latch
// provided by TestingJobManagerRunner
.withLocallyCleanableResource(jobManagerRunnerRegistry)
.withGloballyCleanableResource(
(jobId, ignoredExecutor) -> {
globalCleanupFuture.complete(jobId);
return FutureUtils.completedVoidFuture();
})
.withLocallyCleanableResource(
(jobId, ignoredExecutor) -> {
localCleanupFuture.complete(jobId);
return FutureUtils.completedVoidFuture();
})
.build());
}
@After
public void teardown() throws Exception {
if (dispatcher != null) {
dispatcher.close();
}
if (blobServer != null) {
blobServer.close();
}
}
@AfterClass
public static void teardownClass() throws ExecutionException, InterruptedException {
if (rpcService != null) {
rpcService.stopService().get();
}
}
@Test
public void testGlobalCleanupWhenJobFinished() throws Exception {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob();
// complete the job
finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
assertGlobalCleanupTriggered(jobId);
}
@Test
public void testGlobalCleanupWhenJobCanceled() throws Exception {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob();
// complete the job
cancelJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
assertGlobalCleanupTriggered(jobId);
}
private CompletableFuture<Acknowledge> submitJob() {
return dispatcherGateway.submitJob(jobGraph, timeout);
}
private void submitJobAndWait() {
submitJob().join();
}
@Test
public void testLocalCleanupWhenJobNotFinished() throws Exception {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob();
// job not finished
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
suspendJob(testingJobManagerRunner);
assertLocalCleanupTriggered(jobId);
}
@Test
public void testGlobalCleanupWhenJobSubmissionFails() throws Exception {
startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
final CompletableFuture<Acknowledge> submissionFuture = submitJob();
try {
submissionFuture.get();
fail("Job submission was expected to fail.");
} catch (ExecutionException ee) {
assertThat(ee, containsCause(JobSubmissionException.class));
}
assertGlobalCleanupTriggered(jobId);
}
@Test
public void testLocalCleanupWhenClosingDispatcher() throws Exception {
startDispatcherAndSubmitJob();
dispatcher.closeAsync().get();
assertLocalCleanupTriggered(jobId);
}
@Test
public void testGlobalCleanupWhenJobFinishedWhileClosingDispatcher() throws Exception {
final TestingJobManagerRunner testingJobManagerRunner =
TestingJobManagerRunner.newBuilder()
.setBlockingTermination(true)
.setJobId(jobId)
.build();
final Queue<JobManagerRunner> jobManagerRunners =
new ArrayDeque<>(Arrays.asList(testingJobManagerRunner));
startDispatcher(new QueueJobManagerRunnerFactory(jobManagerRunners));
submitJobAndWait();
final CompletableFuture<Void> dispatcherTerminationFuture = dispatcher.closeAsync();
testingJobManagerRunner.getCloseAsyncCalledLatch().await();
testingJobManagerRunner.completeResultFuture(
new ExecutionGraphInfo(
new ArchivedExecutionGraphBuilder()
.setJobID(jobId)
.setState(JobStatus.FINISHED)
.build()));
testingJobManagerRunner.completeTerminationFuture();
// check that no exceptions have been thrown
dispatcherTerminationFuture.get();
assertGlobalCleanupTriggered(jobId);
}
@Test
public void testJobBeingMarkedAsDirtyBeforeCleanup() throws Exception {
final OneShotLatch markAsDirtyLatch = new OneShotLatch();
final TestingDispatcher.Builder dispatcherBuilder =
createTestingDispatcherBuilder()
.setJobResultStore(
TestingJobResultStore.builder()
.withCreateDirtyResultConsumer(
ignoredJobResultEntry -> {
try {
markAsDirtyLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.build());
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(dispatcherBuilder, 0);
finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
assertThatNoCleanupWasTriggered();
markAsDirtyLatch.trigger();
assertGlobalCleanupTriggered(jobId);
}
@Test
public void testJobBeingMarkedAsCleanAfterCleanup() throws Exception {
final CompletableFuture<JobID> markAsCleanFuture = new CompletableFuture<>();
final JobResultStore jobResultStore =
TestingJobResultStore.builder()
.withMarkResultAsCleanConsumer(markAsCleanFuture::complete)
.build();
final OneShotLatch localCleanupLatch = new OneShotLatch();
final OneShotLatch globalCleanupLatch = new OneShotLatch();
final TestingResourceCleanerFactory resourceCleanerFactory =
TestingResourceCleanerFactory.builder()
.withLocallyCleanableResource(
(ignoredJobId, ignoredExecutor) -> {
try {
localCleanupLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return FutureUtils.completedVoidFuture();
})
.withGloballyCleanableResource(
(ignoredJobId, ignoredExecutor) -> {
try {
globalCleanupLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return FutureUtils.completedVoidFuture();
})
.build();
final TestingDispatcher.Builder dispatcherBuilder =
createTestingDispatcherBuilder()
.setJobResultStore(jobResultStore)
.setResourceCleanerFactory(resourceCleanerFactory);
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(dispatcherBuilder, 0);
finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
assertThat(markAsCleanFuture.isDone(), is(false));
localCleanupLatch.trigger();
assertThat(markAsCleanFuture.isDone(), is(false));
globalCleanupLatch.trigger();
assertThat(markAsCleanFuture.get(), is(jobId));
}
/**
* Tests that the previous JobManager needs to be completely terminated before a new job with
* the same {@link JobID} is started.
*/
@Test
public void testJobSubmissionUnderSameJobId() throws Exception {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(1);
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
suspendJob(testingJobManagerRunner);
// wait until termination JobManagerRunner closeAsync has been called.
// this is necessary to avoid race conditions with completion of the 1st job and the
// submission of the 2nd job (DuplicateJobSubmissionException).
testingJobManagerRunner.getCloseAsyncCalledLatch().await();
final CompletableFuture<Acknowledge> submissionFuture =
dispatcherGateway.submitJob(jobGraph, timeout);
try {
submissionFuture.get(10L, TimeUnit.MILLISECONDS);
fail(
"The job submission future should not complete until the previous JobManager "
+ "termination future has been completed.");
} catch (TimeoutException ignored) {
// expected
} finally {
testingJobManagerRunner.completeTerminationFuture();
}
assertThat(submissionFuture.get(), equalTo(Acknowledge.get()));
}
/**
* Tests that a duplicate job submission won't delete any job meta data (submitted job graphs,
* blobs, etc.).
*/
@Test
public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exception {
final TestingJobManagerRunnerFactory testingJobManagerRunnerFactoryNG =
startDispatcherAndSubmitJob();
final CompletableFuture<Acknowledge> submissionFuture =
dispatcherGateway.submitJob(jobGraph, timeout);
try {
try {
submissionFuture.get();
fail("Expected a DuplicateJobSubmissionFailure.");
} catch (ExecutionException ee) {
assertThat(
ExceptionUtils.findThrowable(ee, DuplicateJobSubmissionException.class)
.isPresent(),
is(true));
}
assertThatNoCleanupWasTriggered();
} finally {
finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner());
}
assertGlobalCleanupTriggered(jobId);
}
private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.FINISHED);
}
private void suspendJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.SUSPENDED);
}
private void cancelJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.CANCELED);
}
private void terminateJobWithState(
TestingJobManagerRunner takeCreatedJobManagerRunner, JobStatus state) {
takeCreatedJobManagerRunner.completeResultFuture(
new ExecutionGraphInfo(
new ArchivedExecutionGraphBuilder()
.setJobID(jobId)
.setState(state)
.build()));
}
private void assertThatNoCleanupWasTriggered() {
assertThat(globalCleanupFuture.isDone(), is(false));
assertThat(localCleanupFuture.isDone(), is(false));
}
@Test
public void testDispatcherTerminationTerminatesRunningJobMasters() throws Exception {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob();
dispatcher.closeAsync().get();
final TestingJobManagerRunner jobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
assertThat(jobManagerRunner.getTerminationFuture().isDone(), is(true));
}
/** Tests that terminating the Dispatcher will wait for all JobMasters to be terminated. */
@Test
public void testDispatcherTerminationWaitsForJobMasterTerminations() throws Exception {
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(1);
final CompletableFuture<Void> dispatcherTerminationFuture = dispatcher.closeAsync();
try {
dispatcherTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
fail("We should not terminate before all running JobMasters have terminated.");
} catch (TimeoutException ignored) {
// expected
} finally {
jobManagerRunnerFactory.takeCreatedJobManagerRunner().completeTerminationFuture();
}
dispatcherTerminationFuture.get();
}
private void assertLocalCleanupTriggered(JobID jobId)
throws ExecutionException, InterruptedException, TimeoutException {
assertThat(localCleanupFuture.get(), equalTo(jobId));
assertThat(globalCleanupFuture.isDone(), is(false));
}
private void assertGlobalCleanupTriggered(JobID jobId)
throws ExecutionException, InterruptedException, TimeoutException {
assertThat(localCleanupFuture.isDone(), is(false));
assertThat(globalCleanupFuture.get(), equalTo(jobId));
}
@Test
public void testFatalErrorIfJobCannotBeMarkedDirtyInJobResultStore() throws Exception {
final JobResultStore jobResultStore =
TestingJobResultStore.builder()
.withCreateDirtyResultConsumer(
jobResult -> {
throw new IOException("Expected IOException.");
})
.build();
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(
createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0);
ArchivedExecutionGraph executionGraph =
new ArchivedExecutionGraphBuilder()
.setJobID(jobId)
.setState(JobStatus.FINISHED)
.build();
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
final CompletableFuture<? extends Throwable> errorFuture =
this.testingFatalErrorHandlerResource.getFatalErrorHandler().getErrorFuture();
assertThat(errorFuture.get(), IsInstanceOf.instanceOf(FlinkException.class));
testingFatalErrorHandlerResource.getFatalErrorHandler().clearError();
}
@Test
public void testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws Exception {
final CompletableFuture<JobResultEntry> dirtyJobFuture = new CompletableFuture<>();
final JobResultStore jobResultStore =
TestingJobResultStore.builder()
.withCreateDirtyResultConsumer(dirtyJobFuture::complete)
.withMarkResultAsCleanConsumer(
jobId -> {
throw new IOException("Expected IOException.");
})
.build();
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(
createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0);
ArchivedExecutionGraph executionGraph =
new ArchivedExecutionGraphBuilder()
.setJobID(jobId)
.setState(JobStatus.FINISHED)
.build();
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
final CompletableFuture<? extends Throwable> errorFuture =
this.testingFatalErrorHandlerResource.getFatalErrorHandler().getErrorFuture();
try {
final Throwable unexpectedError = errorFuture.get(100, TimeUnit.MILLISECONDS);
fail(
"No error should have been reported but an "
+ unexpectedError.getClass()
+ " was handled.");
} catch (TimeoutException e) {
// expected
}
assertThat(dirtyJobFuture.get().getJobId(), is(jobId));
}
/** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */
@Test
public void testFailingJobManagerRunnerCleanup() throws Exception {
final FlinkException testException = new FlinkException("Test exception.");
final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
final BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory =
new BlockingJobManagerRunnerFactory(
() -> {
final Optional<Exception> maybeException = queue.take();
if (maybeException.isPresent()) {
throw maybeException.get();
}
});
startDispatcher(blockingJobManagerRunnerFactory);
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
// submit and fail during job master runner construction
queue.offer(Optional.of(testException));
try {
dispatcherGateway.submitJob(jobGraph, Time.minutes(1)).get();
fail("A FlinkException is expected");
} catch (Throwable expectedException) {
assertThat(expectedException, containsCause(FlinkException.class));
assertThat(expectedException, containsMessage(testException.getMessage()));
// make sure we've cleaned up in correct order (including HA)
assertGlobalCleanupTriggered(jobId);
}
// don't fail this time
queue.offer(Optional.empty());
// submit job again
dispatcherGateway.submitJob(jobGraph, Time.minutes(1L)).get();
blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING);
// Ensure job is running
awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
}
@Test
public void testArchivingFinishedJobToHistoryServer() throws Exception {
final CompletableFuture<Acknowledge> archiveFuture = new CompletableFuture<>();
final TestingDispatcher.Builder testingDispatcherBuilder =
createTestingDispatcherBuilder()
.setHistoryServerArchivist(executionGraphInfo -> archiveFuture);
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(testingDispatcherBuilder, 0);
finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
// Before the archiving is finished, the cleanup is not finished and the job is not
// terminated
assertThatNoCleanupWasTriggered();
final CompletableFuture<Void> jobTerminationFuture =
dispatcher.getJobTerminationFuture(jobId, Time.hours(1));
assertFalse(jobTerminationFuture.isDone());
archiveFuture.complete(Acknowledge.get());
// Once the archive is finished, the cleanup is finished and the job is terminated.
assertGlobalCleanupTriggered(jobId);
jobTerminationFuture.join();
}
@Test
public void testNotArchivingSuspendedJobToHistoryServer() throws Exception {
final AtomicBoolean isArchived = new AtomicBoolean(false);
final TestingDispatcher.Builder testingDispatcherBuilder =
createTestingDispatcherBuilder()
.setHistoryServerArchivist(
executionGraphInfo -> {
isArchived.set(true);
return CompletableFuture.completedFuture(Acknowledge.get());
});
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob(testingDispatcherBuilder, 0);
suspendJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
assertLocalCleanupTriggered(jobId);
dispatcher.getJobTerminationFuture(jobId, Time.hours(1)).join();
assertFalse(isArchived.get());
}
private static final class BlockingJobManagerRunnerFactory
extends TestingJobMasterServiceLeadershipRunnerFactory {
private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
private TestingJobManagerRunner testingRunner;
BlockingJobManagerRunnerFactory(ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
}
@Override
public TestingJobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp)
throws Exception {
jobManagerRunnerCreationLatch.run();
this.testingRunner =
super.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
initializationTimestamp);
TestingJobMasterGateway testingJobMasterGateway =
new TestingJobMasterGatewayBuilder()
.setRequestJobSupplier(
() ->
CompletableFuture.completedFuture(
new ExecutionGraphInfo(
ArchivedExecutionGraph
.createSparseArchivedExecutionGraph(
jobGraph.getJobID(),
jobGraph.getName(),
JobStatus.RUNNING,
null,
null,
1337))))
.build();
testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
return testingRunner;
}
public void setJobStatus(JobStatus newStatus) {
Preconditions.checkState(
testingRunner != null,
"JobManagerRunner must be created before this method is available");
this.testingRunner.setJobStatus(newStatus);
}
}
private static final class QueueJobManagerRunnerFactory implements JobManagerRunnerFactory {
private final Queue<? extends JobManagerRunner> jobManagerRunners;
private QueueJobManagerRunnerFactory(Queue<? extends JobManagerRunner> jobManagerRunners) {
this.jobManagerRunners = jobManagerRunners;
}
@Override
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp) {
return Optional.ofNullable(jobManagerRunners.poll())
.orElseThrow(
() ->
new IllegalStateException(
"Cannot create more JobManagerRunners."));
}
}
private class FailingJobManagerRunnerFactory implements JobManagerRunnerFactory {
private final Exception testException;
public FailingJobManagerRunnerFactory(FlinkException testException) {
this.testException = testException;
}
@Override
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp)
throws Exception {
throw testException;
}
}
}