| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.jobmanager; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.restartstrategy.RestartStrategies; |
| import org.apache.flink.configuration.ConfigConstants; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.HighAvailabilityOptions; |
| import org.apache.flink.runtime.akka.AkkaUtils; |
| import org.apache.flink.runtime.akka.ListeningBehaviour; |
| import org.apache.flink.runtime.clusterframework.types.ResourceID; |
| import org.apache.flink.runtime.highavailability.HighAvailabilityServices; |
| import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; |
| import org.apache.flink.runtime.instance.ActorGateway; |
| import org.apache.flink.runtime.instance.AkkaActorGateway; |
| import org.apache.flink.runtime.jobgraph.JobGraph; |
| import org.apache.flink.runtime.jobgraph.JobStatus; |
| import org.apache.flink.runtime.jobgraph.JobVertex; |
| import org.apache.flink.runtime.leaderelection.TestingListener; |
| import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; |
| import org.apache.flink.runtime.messages.JobManagerMessages; |
| import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; |
| import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; |
| import org.apache.flink.runtime.metrics.NoOpMetricRegistry; |
| import org.apache.flink.runtime.taskmanager.TaskManager; |
| import org.apache.flink.runtime.testingUtils.TestingCluster; |
| import org.apache.flink.runtime.testingUtils.TestingUtils; |
| import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; |
| import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; |
| import org.apache.flink.runtime.testutils.JobManagerProcess; |
| import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; |
| import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; |
| import org.apache.flink.util.TestLogger; |
| |
| import akka.actor.ActorRef; |
| import akka.actor.ActorSystem; |
| import akka.actor.Props; |
| import akka.actor.UntypedActor; |
| import akka.testkit.TestActorRef; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.filefilter.TrueFileFilter; |
| import org.apache.zookeeper.data.Stat; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| |
| import java.io.File; |
| import java.util.Collection; |
| import java.util.Queue; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import scala.Option; |
| import scala.Some; |
| import scala.Tuple2; |
| import scala.concurrent.Await; |
| import scala.concurrent.Future; |
| import scala.concurrent.duration.Deadline; |
| import scala.concurrent.duration.FiniteDuration; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Tests recovery of {@link SubmittedJobGraph} instances. |
| */ |
| public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { |
| |
| private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); |
| |
| private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES); |
| |
| @Rule |
| public TemporaryFolder tempFolder = new TemporaryFolder(); |
| |
| @AfterClass |
| public static void tearDown() throws Exception { |
| ZooKeeper.shutdown(); |
| } |
| |
| @Before |
| public void cleanUp() throws Exception { |
| ZooKeeper.deleteAll(); |
| } |
| |
| // --------------------------------------------------------------------------------------------- |
| |
| /** |
| * Tests that the HA job is not cleaned up when the jobmanager is stopped. |
| */ |
| @Test |
| public void testJobPersistencyWhenJobManagerShutdown() throws Exception { |
| Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( |
| ZooKeeper.getConnectString(), tempFolder.getRoot().getPath()); |
| |
| // Configure the cluster |
| config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1); |
| config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); |
| |
| TestingCluster flink = new TestingCluster(config, false, false); |
| |
| try { |
| final Deadline deadline = TestTimeOut.fromNow(); |
| |
| // Start the JobManager and TaskManager |
| flink.start(true); |
| |
| JobGraph jobGraph = createBlockingJobGraph(); |
| |
| // Set restart strategy to guard against shut down races. |
| // If the TM fails before the JM, it might happen that the |
| // Job is failed, leading to state removal. |
| ExecutionConfig ec = new ExecutionConfig(); |
| ec.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100)); |
| jobGraph.setExecutionConfig(ec); |
| |
| ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft()); |
| |
| // Submit the job |
| jobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); |
| |
| // Wait for the job to start |
| JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, |
| jobManager, deadline.timeLeft()); |
| } |
| finally { |
| flink.stop(); |
| } |
| |
| // verify that the persisted job data has not been removed from ZooKeeper when the JM has |
| // been shutdown |
| verifyRecoveryState(config); |
| } |
| |
| /** |
| * Tests that clients receive updates after recovery by a new leader. |
| */ |
| @Test |
| public void testClientNonDetachedListeningBehaviour() throws Exception { |
| Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( |
| ZooKeeper.getConnectString(), tempFolder.getRoot().getPath()); |
| |
| // Test actor system |
| ActorSystem testSystem = null; |
| |
| // JobManager setup. Start the job managers as separate processes in order to not run the |
| // actors postStop, which cleans up all running jobs. |
| JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; |
| |
| LeaderRetrievalService leaderRetrievalService = null; |
| |
| ActorSystem taskManagerSystem = null; |
| |
| final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( |
| config, |
| TestingUtils.defaultExecutor(), |
| HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); |
| |
| try { |
| final Deadline deadline = TestTimeOut.fromNow(); |
| |
| // Test actor system |
| testSystem = AkkaUtils.createActorSystem(new Configuration(), |
| new Some<>(new Tuple2<String, Object>("localhost", 0))); |
| |
| // The job managers |
| jobManagerProcess[0] = new JobManagerProcess(0, config); |
| jobManagerProcess[1] = new JobManagerProcess(1, config); |
| |
| jobManagerProcess[0].startProcess(); |
| jobManagerProcess[1].startProcess(); |
| |
| // Leader listener |
| TestingListener leaderListener = new TestingListener(); |
| leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); |
| leaderRetrievalService.start(leaderListener); |
| |
| // The task manager |
| taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); |
| TaskManager.startTaskManagerComponentsAndActor( |
| config, |
| ResourceID.generate(), |
| taskManagerSystem, |
| highAvailabilityServices, |
| NoOpMetricRegistry.INSTANCE, |
| "localhost", |
| Option.<String>empty(), |
| false, |
| TaskManager.class); |
| |
| // Client test actor |
| TestActorRef<RecordingTestClient> clientRef = TestActorRef.create( |
| testSystem, Props.create(RecordingTestClient.class)); |
| |
| JobGraph jobGraph = createBlockingJobGraph(); |
| |
| { |
| // Initial submission |
| leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); |
| |
| String leaderAddress = leaderListener.getAddress(); |
| UUID leaderId = leaderListener.getLeaderSessionID(); |
| |
| // The client |
| AkkaActorGateway client = new AkkaActorGateway(clientRef, leaderId); |
| |
| // Get the leader ref |
| ActorRef leaderRef = AkkaUtils.getActorRef( |
| leaderAddress, testSystem, deadline.timeLeft()); |
| ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId); |
| |
| int numSlots = 0; |
| while (numSlots == 0) { |
| Future<?> slotsFuture = leader.ask(JobManagerMessages |
| .getRequestTotalNumberOfSlots(), deadline.timeLeft()); |
| |
| numSlots = (Integer) Await.result(slotsFuture, deadline.timeLeft()); |
| } |
| |
| // Submit the job in non-detached mode |
| leader.tell(new SubmitJob(jobGraph, |
| ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client); |
| |
| JobManagerActorTestUtils.waitForJobStatus( |
| jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft()); |
| } |
| |
| // Who's the boss? |
| JobManagerProcess leadingJobManagerProcess; |
| if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) { |
| leadingJobManagerProcess = jobManagerProcess[0]; |
| } |
| else { |
| leadingJobManagerProcess = jobManagerProcess[1]; |
| } |
| |
| // Kill the leading job manager process |
| leadingJobManagerProcess.destroy(); |
| |
| { |
| // Recovery by the standby JobManager |
| leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); |
| |
| String leaderAddress = leaderListener.getAddress(); |
| UUID leaderId = leaderListener.getLeaderSessionID(); |
| |
| ActorRef leaderRef = AkkaUtils.getActorRef( |
| leaderAddress, testSystem, deadline.timeLeft()); |
| ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId); |
| |
| JobManagerActorTestUtils.waitForJobStatus( |
| jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft()); |
| |
| // Cancel the job |
| leader.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); |
| } |
| |
| // Wait for the execution result |
| clientRef.underlyingActor().awaitJobResult(deadline.timeLeft().toMillis()); |
| |
| int jobSubmitSuccessMessages = 0; |
| for (Object msg : clientRef.underlyingActor().getMessages()) { |
| if (msg instanceof JobManagerMessages.JobSubmitSuccess) { |
| jobSubmitSuccessMessages++; |
| } |
| } |
| |
| // At least two submissions should be ack-ed (initial and recovery). This is quite |
| // conservative, but it is still possible that these messages are overtaken by the |
| // final message. |
| assertEquals(2, jobSubmitSuccessMessages); |
| } |
| catch (Throwable t) { |
| // Print early (in some situations the process logs get too big |
| // for Travis and the root problem is not shown) |
| t.printStackTrace(); |
| |
| // In case of an error, print the job manager process logs. |
| if (jobManagerProcess[0] != null) { |
| jobManagerProcess[0].printProcessLog(); |
| } |
| |
| if (jobManagerProcess[1] != null) { |
| jobManagerProcess[1].printProcessLog(); |
| } |
| |
| throw t; |
| } |
| finally { |
| if (jobManagerProcess[0] != null) { |
| jobManagerProcess[0].destroy(); |
| } |
| |
| if (jobManagerProcess[1] != null) { |
| jobManagerProcess[1].destroy(); |
| } |
| |
| if (leaderRetrievalService != null) { |
| leaderRetrievalService.stop(); |
| } |
| |
| if (taskManagerSystem != null) { |
| taskManagerSystem.shutdown(); |
| } |
| |
| if (testSystem != null) { |
| testSystem.shutdown(); |
| } |
| |
| highAvailabilityServices.closeAndCleanupAllData(); |
| } |
| } |
| |
| /** |
| * Simple recording client. |
| */ |
| private static class RecordingTestClient extends UntypedActor { |
| |
| private final Queue<Object> messages = new ConcurrentLinkedQueue<>(); |
| |
| private CountDownLatch jobResultLatch = new CountDownLatch(1); |
| |
| @Override |
| public void onReceive(Object message) throws Exception { |
| if (message instanceof LeaderSessionMessage) { |
| message = ((LeaderSessionMessage) message).message(); |
| } |
| |
| messages.add(message); |
| |
| // Check for job result |
| if (message instanceof JobManagerMessages.JobResultFailure || |
| message instanceof JobManagerMessages.JobResultSuccess) { |
| |
| jobResultLatch.countDown(); |
| } |
| } |
| |
| public Queue<Object> getMessages() { |
| return messages; |
| } |
| |
| public void awaitJobResult(long timeout) throws InterruptedException { |
| jobResultLatch.await(timeout, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| // --------------------------------------------------------------------------------------------- |
| |
| /** |
| * Creates a simple blocking JobGraph. |
| */ |
| private static JobGraph createBlockingJobGraph() { |
| JobGraph jobGraph = new JobGraph("Blocking program"); |
| |
| JobVertex jobVertex = new JobVertex("Blocking Vertex"); |
| jobVertex.setInvokableClass(BlockingNoOpInvokable.class); |
| |
| jobGraph.addVertex(jobVertex); |
| |
| return jobGraph; |
| } |
| |
| /** |
| * Fails the test if the recovery state (file state backend and ZooKeeper) is not clean. |
| */ |
| private void verifyCleanRecoveryState(Configuration config) throws Exception { |
| // File state backend empty |
| Collection<File> stateHandles = FileUtils.listFiles( |
| tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE); |
| |
| if (!stateHandles.isEmpty()) { |
| fail("File state backend is not clean: " + stateHandles); |
| } |
| |
| // ZooKeeper |
| String currentJobsPath = config.getString( |
| HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH); |
| |
| Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath); |
| |
| if (stat.getCversion() == 0) { |
| // Sanity check: verify that some changes have been performed |
| fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " + |
| "this test. What are you testing?"); |
| } |
| |
| if (stat.getNumChildren() != 0) { |
| // Is everything clean again? |
| fail("ZooKeeper path '" + currentJobsPath + "' is not clean: " + |
| ZooKeeper.getClient().getChildren().forPath(currentJobsPath)); |
| } |
| } |
| |
| /** |
| * Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned. |
| */ |
| private void verifyRecoveryState(Configuration config) throws Exception { |
| // File state backend empty |
| Collection<File> stateHandles = FileUtils.listFiles( |
| tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE); |
| |
| if (stateHandles.isEmpty()) { |
| fail("File state backend has been cleaned: " + stateHandles); |
| } |
| |
| // ZooKeeper |
| String currentJobsPath = config.getString( |
| HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH); |
| |
| Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath); |
| |
| if (stat.getCversion() == 0) { |
| // Sanity check: verify that some changes have been performed |
| fail("ZooKeeper state for '" + currentJobsPath + "' has not been modified during " + |
| "this test. What are you testing?"); |
| } |
| |
| if (stat.getNumChildren() == 0) { |
| // Children have been cleaned up? |
| fail("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " + |
| ZooKeeper.getClient().getChildren().forPath(currentJobsPath)); |
| } |
| } |
| |
| } |