blob: bb8f31d69037a34eaf4a66806db4bc337c612043 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.OptionalFailure;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import static junit.framework.Assert.fail;
import static org.junit.Assert.assertNotEquals;
/**
* Test savepoint migration.
*/
@Category(AlsoRunWithSchedulerNG.class)
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
@BeforeClass
public static void before() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
@Rule
public final MiniClusterWithClientResource miniClusterResource;
private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
protected static final int DEFAULT_PARALLELISM = 4;
protected static String getResourceFilename(String filename) {
ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
URL resource = cl.getResource(filename);
if (resource == null) {
throw new NullPointerException("Missing snapshot resource.");
}
return resource.getFile();
}
protected SavepointMigrationTestBase() throws Exception {
miniClusterResource = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.build());
}
private Configuration getConfiguration() throws Exception {
// Flink configuration
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
UUID id = UUID.randomUUID();
final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();
final File savepointDir = TEMP_FOLDER.newFolder("savepoints_" + id).getAbsoluteFile();
if (!checkpointDir.exists() || !savepointDir.exists()) {
throw new Exception("Test setup failed: failed to create (temporary) directories.");
}
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
LOG.info("Created savepoint directory: " + savepointDir + ".");
config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 300L);
return config;
}
@SafeVarargs
protected final void executeAndSavepoint(
StreamExecutionEnvironment env,
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception {
ClusterClient<?> client = miniClusterResource.getClusterClient();
client.setDetached(true);
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
Thread.sleep(100);
Map<String, OptionalFailure<Object>> accumulators = client.getAccumulators(jobSubmissionResult.getJobID());
boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
OptionalFailure<Object> accumOpt = accumulators.get(acc.f0);
if (accumOpt == null) {
allDone = false;
break;
}
Integer numFinished = (Integer) accumOpt.get();
if (numFinished == null) {
allDone = false;
break;
}
if (!numFinished.equals(acc.f1)) {
allDone = false;
break;
}
}
if (allDone) {
done = true;
break;
}
}
if (!done) {
fail("Did not see the expected accumulator results within time limit.");
}
LOG.info("Triggering savepoint.");
CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobSubmissionResult.getJobID(), null);
String jobmanagerSavepointPath = savepointPathFuture.get(DEADLINE.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
// savepoints were changed to be directories in Flink 1.3
if (jobManagerSavepoint.isDirectory()) {
FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath));
} else {
FileUtils.moveFile(jobManagerSavepoint, new File(savepointPath));
}
}
@SafeVarargs
protected final void restoreAndExecute(
StreamExecutionEnvironment env,
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception {
ClusterClient<?> client = miniClusterResource.getClusterClient();
client.setDetached(true);
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
// try and get a job result, this will fail if the job already failed. Use this
// to get out of this loop
JobID jobId = jobSubmissionResult.getJobID();
try {
CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
assertNotEquals(JobStatus.FAILED, jobStatus);
} catch (Exception e) {
fail("Could not connect to job: " + e);
}
Thread.sleep(100);
Map<String, OptionalFailure<Object>> accumulators = client.getAccumulators(jobId);
boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
OptionalFailure<Object> numFinished = accumulators.get(acc.f0);
if (numFinished == null) {
allDone = false;
break;
}
if (!numFinished.get().equals(acc.f1)) {
allDone = false;
break;
}
}
if (allDone) {
done = true;
break;
}
}
if (!done) {
fail("Did not see the expected accumulator results within time limit.");
}
}
}