blob: 377d8c9e5883c007dd8a40e75b2317a6baecc8db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recovery;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.test.util.TestProcessBuilder.TestProcess;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
/**
* Abstract base for tests verifying the behavior of the recovery in the case when a TaskManager
* fails (process is killed) in the middle of a job execution.
*
* <p>The test works with multiple task managers processes by spawning JVMs. Initially, it starts a
* JobManager in process and two TaskManagers JVMs with 2 task slots each. It submits a program with
* parallelism 4 and waits until all tasks are brought up. Coordination between the test and the
* tasks happens via checking for the existence of temporary files. It then starts another
* TaskManager, which is guaranteed to remain empty (all tasks are already deployed) and kills one
* of the original task managers. The recovery should restart the tasks on the new TaskManager.
*/
public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends TestLogger {
protected static final String READY_MARKER_FILE_PREFIX = "ready_";
protected static final String PROCEED_MARKER_FILE = "proceed";
protected static final int PARALLELISM = 4;
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule public final BlobServerResource blobServerResource = new BlobServerResource();
@Rule public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
@Test
public void testTaskManagerProcessFailure() throws Exception {
TestProcess taskManagerProcess1 = null;
TestProcess taskManagerProcess2 = null;
TestProcess taskManagerProcess3 = null;
File coordinateTempDir = null;
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setString(RestOptions.BIND_PORT, "0");
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
config.setString(
HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getAbsolutePath());
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"));
config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k"));
config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k"));
config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m"));
config.set(TaskManagerOptions.CPU_CORES, 1.0);
config.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofSeconds(30L));
try (final StandaloneSessionClusterEntrypoint clusterEntrypoint =
new StandaloneSessionClusterEntrypoint(config)) {
// check that we run this test only if the java command
// is available on this machine
String javaCommand = getJavaCommandPath();
if (javaCommand == null) {
System.out.println(
"---- Skipping Process Failure test : Could not find java executable ----");
return;
}
clusterEntrypoint.startCluster();
// coordination between the processes goes through a directory
coordinateTempDir = temporaryFolder.newFolder();
TestProcessBuilder taskManagerProcessBuilder =
new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
taskManagerProcessBuilder.addConfigAsMainClassArgs(config);
// start the first two TaskManager processes
taskManagerProcess1 = taskManagerProcessBuilder.start();
taskManagerProcess2 = taskManagerProcessBuilder.start();
// the program will set a marker file in each of its parallel tasks once they are ready,
// so that
// this coordinating code is aware of this.
// the program will very slowly consume elements until the marker file (later created by
// the
// test driver code) is present
final File coordinateDirClosure = coordinateTempDir;
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
// we trigger program execution in a separate thread
Thread programTrigger =
new Thread("Program Trigger") {
@Override
public void run() {
try {
testTaskManagerFailure(config, coordinateDirClosure);
} catch (Throwable t) {
t.printStackTrace();
errorRef.set(t);
}
}
};
// start the test program
programTrigger.start();
// wait until all marker files are in place, indicating that all tasks have started
// max 20 seconds
if (!waitForMarkerFiles(
coordinateTempDir, READY_MARKER_FILE_PREFIX, PARALLELISM, 120000)) {
// check if the program failed for some reason
if (errorRef.get() != null) {
Throwable error = errorRef.get();
error.printStackTrace();
fail(
"The program encountered a "
+ error.getClass().getSimpleName()
+ " : "
+ error.getMessage());
} else {
// no error occurred, simply a timeout
fail("The tasks were not started within time (" + 120000 + "msecs)");
}
}
// start the third TaskManager
taskManagerProcess3 = taskManagerProcessBuilder.start();
// kill one of the previous TaskManagers, triggering a failure and recovery
taskManagerProcess1.destroy();
waitForShutdown("TaskManager 1", taskManagerProcess1);
// we create the marker file which signals the program functions tasks that they can
// complete
touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
// wait for at most 5 minutes for the program to complete
programTrigger.join(300000);
// check that the program really finished
assertFalse("The program did not finish in time", programTrigger.isAlive());
// check whether the program encountered an error
if (errorRef.get() != null) {
Throwable error = errorRef.get();
error.printStackTrace();
fail(
"The program encountered a "
+ error.getClass().getSimpleName()
+ " : "
+ error.getMessage());
}
// all seems well :-)
} catch (Exception e) {
e.printStackTrace();
printProcessLog("TaskManager 1", taskManagerProcess1);
printProcessLog("TaskManager 2", taskManagerProcess2);
printProcessLog("TaskManager 3", taskManagerProcess3);
fail(e.getMessage());
} catch (Error e) {
e.printStackTrace();
printProcessLog("TaskManager 1", taskManagerProcess1);
printProcessLog("TaskManager 2", taskManagerProcess2);
printProcessLog("TaskManager 3", taskManagerProcess3);
throw e;
} finally {
if (taskManagerProcess1 != null) {
taskManagerProcess1.destroy();
}
if (taskManagerProcess2 != null) {
taskManagerProcess2.destroy();
}
if (taskManagerProcess3 != null) {
taskManagerProcess3.destroy();
}
waitForShutdown("TaskManager 1", taskManagerProcess1);
waitForShutdown("TaskManager 2", taskManagerProcess2);
waitForShutdown("TaskManager 3", taskManagerProcess3);
}
}
private void waitForShutdown(final String processName, @Nullable final TestProcess process)
throws InterruptedException {
if (process == null) {
return;
}
if (!process.getProcess().waitFor(30, TimeUnit.SECONDS)) {
log.error("{} did not shutdown in time.", processName);
printProcessLog(processName, process);
process.getProcess().destroyForcibly();
}
}
/**
* The test program should be implemented here in a form of a separate thread. This provides a
* solution for checking that it has been terminated.
*
* @param configuration the config to use
* @param coordinateDir TaskManager failure will be triggered only after processes have
* successfully created file under this directory
*/
public abstract void testTaskManagerFailure(Configuration configuration, File coordinateDir)
throws Exception;
protected static void printProcessLog(String processName, TestProcess process) {
if (process == null) {
System.out.println("-----------------------------------------");
System.out.println(" PROCESS " + processName + " WAS NOT STARTED.");
System.out.println("-----------------------------------------");
} else {
System.out.println("-----------------------------------------");
System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
System.out.println("-----------------------------------------");
System.out.println(process.getErrorOutput().toString());
System.out.println("-----------------------------------------");
System.out.println(" END SPAWNED PROCESS LOG");
System.out.println("-----------------------------------------");
}
}
protected static void touchFile(File file) throws IOException {
if (!file.exists()) {
new FileOutputStream(file).close();
}
if (!file.setLastModified(System.currentTimeMillis())) {
throw new IOException("Could not touch the file.");
}
}
protected static boolean waitForMarkerFiles(
File basedir, String prefix, int num, long timeout) {
long now = System.currentTimeMillis();
final long deadline = now + timeout;
while (now < deadline) {
boolean allFound = true;
for (int i = 0; i < num; i++) {
File nextToCheck = new File(basedir, prefix + i);
if (!nextToCheck.exists()) {
allFound = false;
break;
}
}
if (allFound) {
return true;
} else {
// not all found, wait for a bit
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
now = System.currentTimeMillis();
}
}
return false;
}
// --------------------------------------------------------------------------------------------
/** The entry point for the TaskExecutor JVM. Simply configures and runs a TaskExecutor. */
public static class TaskExecutorProcessEntryPoint {
private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class);
public static void main(String[] args) {
try {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
Configuration cfg = parameterTool.getConfiguration();
final PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(cfg);
TaskExecutorResourceUtils.adjustForLocalExecution(cfg);
TaskManagerRunner.runTaskManager(cfg, pluginManager);
} catch (Throwable t) {
LOG.error("Failed to run the TaskManager process", t);
System.exit(1);
}
}
}
}