blob: fb3acc7fadd74c99f2a7b20687121b197ae7d397 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recovery;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testutils.DispatcherProcess;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
/**
* Verify behaviour in case of JobManager process failure during job execution.
*
* <p>The test works with multiple job managers processes by spawning JVMs.
*
* <p>Initially, it starts two TaskManager (2 slots each) and two JobManager JVMs.
*
* <p>It submits a program with parallelism 4 and waits until all tasks are brought up. Coordination
* between the test and the tasks happens via checking for the existence of temporary files. It then
* kills the leading JobManager process. The recovery should restart the tasks on the new
* JobManager.
*
* <p>This follows the same structure as {@link AbstractTaskManagerProcessFailureRecoveryTest}.
*/
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
private static ZooKeeperTestEnvironment zooKeeper;
private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5);
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@BeforeClass
public static void setup() {
zooKeeper = new ZooKeeperTestEnvironment(1);
}
@Before
public void cleanUp() throws Exception {
zooKeeper.deleteAll();
}
@AfterClass
public static void tearDown() throws Exception {
if (zooKeeper != null) {
zooKeeper.shutdown();
}
}
protected static final String READY_MARKER_FILE_PREFIX = "ready_";
protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
protected static final String PROCEED_MARKER_FILE = "proceed";
protected static final int PARALLELISM = 4;
// --------------------------------------------------------------------------------------------
// Parametrization (run pipelined and batch)
// --------------------------------------------------------------------------------------------
private final ExecutionMode executionMode;
public JobManagerHAProcessFailureRecoveryITCase(ExecutionMode executionMode) {
this.executionMode = executionMode;
}
@Parameterized.Parameters(name = "ExecutionMode {0}")
public static Collection<Object[]> executionMode() {
return Arrays.asList(new Object[][] {{ExecutionMode.PIPELINED}, {ExecutionMode.BATCH}});
}
/**
* Test program with JobManager failure.
*
* @param zkQuorum ZooKeeper quorum to connect to
* @param coordinateDir Coordination directory
* @throws Exception
*/
private void testJobManagerFailure(
String zkQuorum, final File coordinateDir, final File zookeeperStoragePath)
throws Exception {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
config.setString(
HighAvailabilityOptions.HA_STORAGE_PATH, zookeeperStoragePath.getAbsolutePath());
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("leader", 1, config);
env.setParallelism(PARALLELISM);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
env.getConfig().setExecutionMode(executionMode);
final long numElements = 100000L;
final DataSet<Long> result =
env.generateSequence(1, numElements)
// make sure every mapper is involved (no one is skipped because of lazy
// split assignment)
.rebalance()
// the majority of the behavior is in the MapFunction
.map(
new RichMapFunction<Long, Long>() {
private final File proceedFile =
new File(coordinateDir, PROCEED_MARKER_FILE);
private boolean markerCreated = false;
private boolean checkForProceedFile = true;
@Override
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex =
getRuntimeContext().getIndexOfThisSubtask();
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
new File(
coordinateDir,
READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}
// check if the proceed file exists
if (checkForProceedFile) {
if (proceedFile.exists()) {
checkForProceedFile = false;
} else {
// otherwise wait so that we make slow progress
Thread.sleep(100);
}
}
return value;
}
})
.reduce(
new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) {
return value1 + value2;
}
})
// The check is done in the mapper, because the client can currently not
// handle
// job manager losses/reconnects.
.flatMap(
new RichFlatMapFunction<Long, Long>() {
@Override
public void flatMap(Long value, Collector<Long> out)
throws Exception {
assertEquals(
numElements * (numElements + 1L) / 2L,
(long) value);
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
new File(
coordinateDir,
FINISH_MARKER_FILE_PREFIX + taskIndex));
}
});
result.output(new DiscardingOutputFormat<Long>());
env.execute();
}
@Test
public void testDispatcherProcessFailure() throws Exception {
final Time timeout = Time.seconds(30L);
final File zookeeperStoragePath = temporaryFolder.newFolder();
// Config
final int numberOfJobManagers = 2;
final int numberOfTaskManagers = 2;
final int numberOfSlotsPerTaskManager = 2;
assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
// Job managers
final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers];
// Task managers
TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers];
HighAvailabilityServices highAvailabilityServices = null;
LeaderRetrievalService leaderRetrievalService = null;
// Coordination between the processes goes through a directory
File coordinateTempDir = null;
// Cluster config
Configuration config =
ZooKeeperTestUtils.createZooKeeperHAConfig(
zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
// Task manager configuration
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"));
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k"));
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k"));
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m"));
config.set(TaskManagerOptions.CPU_CORES, 1.0);
TaskExecutorResourceUtils.adjustForLocalExecution(config);
final RpcService rpcService =
RpcSystem.load().remoteServiceBuilder(config, "localhost", "0").createAndStart();
try {
final Deadline deadline = Deadline.fromNow(TEST_TIMEOUT);
// Coordination directory
coordinateTempDir = temporaryFolder.newFolder();
// Start first process
dispatcherProcesses[0] = new DispatcherProcess(0, config);
dispatcherProcesses[0].startProcess();
highAvailabilityServices =
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
config, TestingUtils.defaultExecutor());
final PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(config);
// Start the task manager process
for (int i = 0; i < numberOfTaskManagers; i++) {
taskManagerRunners[i] =
new TaskManagerRunner(
config,
pluginManager,
TaskManagerRunner::createTaskExecutorService);
taskManagerRunners[i].start();
}
// Leader listener
TestingListener leaderListener = new TestingListener();
leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
leaderRetrievalService.start(leaderListener);
// Initial submission
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
String leaderAddress = leaderListener.getAddress();
UUID leaderId = leaderListener.getLeaderSessionID();
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture =
rpcService.connect(
leaderAddress,
DispatcherId.fromUuid(leaderId),
DispatcherGateway.class);
final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get();
// Wait for all task managers to connect to the leading job manager
waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft());
final File coordinateDirClosure = coordinateTempDir;
final Throwable[] errorRef = new Throwable[1];
// we trigger program execution in a separate thread
Thread programTrigger =
new Thread("Program Trigger") {
@Override
public void run() {
try {
testJobManagerFailure(
zooKeeper.getConnectString(),
coordinateDirClosure,
zookeeperStoragePath);
} catch (Throwable t) {
t.printStackTrace();
errorRef[0] = t;
}
}
};
// start the test program
programTrigger.start();
// wait until all marker files are in place, indicating that all tasks have started
AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(
coordinateTempDir,
READY_MARKER_FILE_PREFIX,
PARALLELISM,
deadline.timeLeft().toMillis());
// Kill one of the job managers and trigger recovery
dispatcherProcesses[0].destroy();
dispatcherProcesses[1] = new DispatcherProcess(1, config);
dispatcherProcesses[1].startProcess();
// we create the marker file which signals the program functions tasks that they can
// complete
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
new File(coordinateTempDir, PROCEED_MARKER_FILE));
programTrigger.join(deadline.timeLeft().toMillis());
// We wait for the finish marker file. We don't wait for the program trigger, because
// we submit in detached mode.
AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(
coordinateTempDir,
FINISH_MARKER_FILE_PREFIX,
1,
deadline.timeLeft().toMillis());
// check that the program really finished
assertFalse("The program did not finish in time", programTrigger.isAlive());
// check whether the program encountered an error
if (errorRef[0] != null) {
Throwable error = errorRef[0];
error.printStackTrace();
fail(
"The program encountered a "
+ error.getClass().getSimpleName()
+ " : "
+ error.getMessage());
}
} catch (Throwable t) {
// Print early (in some situations the process logs get too big
// for Travis and the root problem is not shown)
t.printStackTrace();
for (DispatcherProcess p : dispatcherProcesses) {
if (p != null) {
p.printProcessLog();
}
}
throw t;
} finally {
for (int i = 0; i < numberOfTaskManagers; i++) {
if (taskManagerRunners[i] != null) {
taskManagerRunners[i].close();
}
}
if (leaderRetrievalService != null) {
leaderRetrievalService.stop();
}
for (DispatcherProcess dispatcherProcess : dispatcherProcesses) {
if (dispatcherProcess != null) {
dispatcherProcess.destroy();
}
}
if (highAvailabilityServices != null) {
highAvailabilityServices.closeAndCleanupAllData();
}
RpcUtils.terminateRpcService(rpcService, timeout);
// Delete coordination directory
if (coordinateTempDir != null) {
try {
FileUtils.deleteDirectory(coordinateTempDir);
} catch (Throwable ignored) {
}
}
}
}
private void waitForTaskManagers(
int numberOfTaskManagers, DispatcherGateway dispatcherGateway, Duration timeLeft)
throws ExecutionException, InterruptedException {
FutureUtils.retrySuccessfulWithDelay(
() ->
dispatcherGateway.requestClusterOverview(
Time.milliseconds(timeLeft.toMillis())),
Time.milliseconds(50L),
org.apache.flink.api.common.time.Deadline.fromNow(
Duration.ofMillis(timeLeft.toMillis())),
clusterOverview ->
clusterOverview.getNumTaskManagersConnected()
>= numberOfTaskManagers,
new ScheduledExecutorServiceAdapter(
Executors.newSingleThreadScheduledExecutor()))
.get();
}
}