| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.test.state.operator.restore; |
| |
| import org.apache.flink.api.common.JobStatus; |
| import org.apache.flink.api.common.restartstrategy.RestartStrategies; |
| import org.apache.flink.api.common.time.Deadline; |
| import org.apache.flink.api.common.time.Time; |
| import org.apache.flink.client.program.ClusterClient; |
| import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; |
| import org.apache.flink.runtime.jobgraph.JobGraph; |
| import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; |
| import org.apache.flink.runtime.state.StateBackend; |
| import org.apache.flink.runtime.state.memory.MemoryStateBackend; |
| import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; |
| import org.apache.flink.runtime.testutils.TestingUtils; |
| import org.apache.flink.streaming.api.CheckpointingMode; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; |
| import org.apache.flink.test.util.MiniClusterWithClientResource; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.TestLogger; |
| import org.apache.flink.util.concurrent.FutureUtils; |
| |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| |
| import java.io.File; |
| import java.net.URL; |
| import java.time.Duration; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| |
| /** |
| * Abstract class to verify that it is possible to migrate a savepoint across upgraded Flink |
| * versions and that the topology can be modified from that point on. |
| * |
| * <p>The verification is done in 2 Steps: Step 1: Migrate the job to the newer version by |
| * submitting the same job used for the old version savepoint, and create a new savepoint. Step 2: |
| * Modify the job topology, and restore from the savepoint created in step 1. |
| * |
| * <p>The savepoint _metadata file for the current branch is stored in the savepointPath in {@link |
| * AbstractOperatorRestoreTestBase#migrateJob}, please create the corresponding test resource |
| * directory and copy the _metadata file by hand. |
| */ |
| public abstract class AbstractOperatorRestoreTestBase extends TestLogger { |
| |
| private static final int NUM_TMS = 1; |
| private static final int NUM_SLOTS_PER_TM = 4; |
| private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L); |
| private static final Pattern PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS = |
| Pattern.compile( |
| Stream.of( |
| "was not running", |
| CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING |
| .message(), |
| CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY |
| .message(), |
| CheckpointFailureReason |
| .CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER |
| .message()) |
| .map(AbstractOperatorRestoreTestBase::escapeRegexCharacters) |
| .collect(Collectors.joining(")|(", "(", ")"))); |
| |
| @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); |
| |
| @Rule |
| public final MiniClusterWithClientResource cluster = |
| new MiniClusterWithClientResource( |
| new MiniClusterResourceConfiguration.Builder() |
| .setNumberTaskManagers(NUM_TMS) |
| .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) |
| .build()); |
| |
| private final boolean allowNonRestoredState; |
| |
| protected AbstractOperatorRestoreTestBase() { |
| this(true); |
| } |
| |
| protected AbstractOperatorRestoreTestBase(boolean allowNonRestoredState) { |
| this.allowNonRestoredState = allowNonRestoredState; |
| } |
| |
| @Test |
| public void testMigrationAndRestore() throws Throwable { |
| ClusterClient<?> clusterClient = cluster.getClusterClient(); |
| final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); |
| |
| // submit job with old version savepoint and create a migrated savepoint in the new version |
| String savepointPath = migrateJob(clusterClient, deadline); |
| // restore from migrated new version savepoint |
| restoreJob(clusterClient, deadline, savepointPath); |
| } |
| |
| private String migrateJob(ClusterClient<?> clusterClient, Deadline deadline) throws Throwable { |
| |
| URL savepointResource = |
| AbstractOperatorRestoreTestBase.class |
| .getClassLoader() |
| .getResource("operatorstate/" + getMigrationSavepointName()); |
| if (savepointResource == null) { |
| throw new IllegalArgumentException("Savepoint file does not exist."); |
| } |
| JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE); |
| jobToMigrate.setSavepointRestoreSettings( |
| SavepointRestoreSettings.forPath(savepointResource.getFile())); |
| |
| assertNotNull(jobToMigrate.getJobID()); |
| |
| clusterClient.submitJob(jobToMigrate).get(); |
| |
| CompletableFuture<JobStatus> jobRunningFuture = |
| FutureUtils.retrySuccessfulWithDelay( |
| () -> clusterClient.getJobStatus(jobToMigrate.getJobID()), |
| Time.milliseconds(50), |
| deadline, |
| (jobStatus) -> jobStatus == JobStatus.RUNNING, |
| TestingUtils.defaultScheduledExecutor()); |
| assertEquals( |
| JobStatus.RUNNING, |
| jobRunningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); |
| |
| // Trigger savepoint |
| File targetDirectory = tmpFolder.newFolder(); |
| String savepointPath = null; |
| |
| // FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running |
| // TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see |
| // FLINK-4714) |
| while (deadline.hasTimeLeft() && savepointPath == null) { |
| try { |
| savepointPath = |
| clusterClient |
| .cancelWithSavepoint( |
| jobToMigrate.getJobID(), targetDirectory.getAbsolutePath()) |
| .get(); |
| } catch (Exception e) { |
| String exceptionString = ExceptionUtils.stringifyException(e); |
| if (!PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS |
| .matcher(exceptionString) |
| .find()) { |
| throw e; |
| } |
| } |
| } |
| |
| assertNotNull("Could not take savepoint.", savepointPath); |
| |
| CompletableFuture<JobStatus> jobCanceledFuture = |
| FutureUtils.retrySuccessfulWithDelay( |
| () -> clusterClient.getJobStatus(jobToMigrate.getJobID()), |
| Time.milliseconds(50), |
| deadline, |
| (jobStatus) -> jobStatus == JobStatus.CANCELED, |
| TestingUtils.defaultScheduledExecutor()); |
| assertEquals( |
| JobStatus.CANCELED, |
| jobCanceledFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); |
| |
| return savepointPath; |
| } |
| |
| private void restoreJob(ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) |
| throws Exception { |
| JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE); |
| jobToRestore.setSavepointRestoreSettings( |
| SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState)); |
| |
| assertNotNull("Job doesn't have a JobID.", jobToRestore.getJobID()); |
| |
| clusterClient.submitJob(jobToRestore).get(); |
| |
| CompletableFuture<JobStatus> jobStatusFuture = |
| FutureUtils.retrySuccessfulWithDelay( |
| () -> clusterClient.getJobStatus(jobToRestore.getJobID()), |
| Time.milliseconds(50), |
| deadline, |
| (jobStatus) -> jobStatus == JobStatus.FINISHED, |
| TestingUtils.defaultScheduledExecutor()); |
| assertEquals( |
| JobStatus.FINISHED, |
| jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); |
| } |
| |
| private JobGraph createJobGraph(ExecutionMode mode) { |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); |
| env.setRestartStrategy(RestartStrategies.noRestart()); |
| env.setStateBackend((StateBackend) new MemoryStateBackend()); |
| |
| switch (mode) { |
| case MIGRATE: |
| createMigrationJob(env); |
| break; |
| case RESTORE: |
| createRestoredJob(env); |
| break; |
| } |
| |
| return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); |
| } |
| |
| /** |
| * Recreates the job used to create the new version savepoint. |
| * |
| * @param env StreamExecutionEnvironment to use |
| */ |
| protected abstract void createMigrationJob(StreamExecutionEnvironment env); |
| |
| /** |
| * Creates a modified version of the job used to create the new version savepoint. |
| * |
| * @param env StreamExecutionEnvironment to use |
| */ |
| protected abstract void createRestoredJob(StreamExecutionEnvironment env); |
| |
| /** |
| * Returns the name of the savepoint directory to use, relative to "resources/operatorstate". |
| * |
| * @return savepoint directory to use |
| */ |
| protected abstract String getMigrationSavepointName(); |
| |
| private static String escapeRegexCharacters(String string) { |
| return string.replaceAll("\\(", "\\\\(").replaceAll("\\)", "\\\\)"); |
| } |
| } |