blob: 7a931082ac96cccda2306c88184a6f65c93a07b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.SerializableSupplier;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
import static org.apache.flink.configuration.JobManagerOptions.SchedulerType.Adaptive;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assume.assumeFalse;
/**
* Test that failure on recovery leads to job restart if configured, so that transient recovery
* failures can are mitigated.
*/
public class CheckpointStoreITCase extends TestLogger {
private static final Configuration CONFIGURATION =
new Configuration()
.set(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());
@ClassRule
public static final MiniClusterWithClientResource CLUSTER =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(CONFIGURATION)
.build());
@Before
public void init() {
FailingStore.reset();
FailingMapper.reset();
}
@Test
public void testRestartOnRecoveryFailure() throws Exception {
assumeFalse(
// TODO: remove after FLINK-22483
"Adaptive scheduler doesn't retry after failures on recovery",
ClusterOptions.getSchedulerType(CONFIGURATION) == Adaptive);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10);
env.setRestartStrategy(fixedDelayRestart(2 /* failure on processing + on recovery */, 0));
env.addSource(emitUntil(() -> FailingStore.recovered && FailingMapper.failedAndProcessed))
.map(new FailingMapper())
.addSink(new DiscardingSink<>());
env.execute();
checkState(FailingStore.recovered && FailingMapper.failedAndProcessed);
}
private static class FailingMapper implements MapFunction<Integer, Integer> {
private static volatile boolean failed = false;
private static volatile boolean failedAndProcessed = false;
public static void reset() {
failed = false;
failedAndProcessed = false;
}
@Override
public Integer map(Integer element) throws Exception {
if (!failed) {
failed = true;
throw new RuntimeException();
} else {
failedAndProcessed = true;
return element;
}
}
}
/** TestingHAFactory. */
public static class TestingHAFactory implements HighAvailabilityServicesFactory {
@Override
public HighAvailabilityServices createHAServices(
Configuration configuration, Executor executor) {
return new EmbeddedHaServices(Executors.directExecutor()) {
@Override
public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
return new TestingCheckpointRecoveryFactory(
new FailingStore(),
new TestingCheckpointIDCounter(new CompletableFuture<>()));
}
};
}
}
private static class FailingStore implements CompletedCheckpointStore {
private static volatile boolean started = false;
private static volatile boolean failed = false;
private static volatile boolean recovered = false;
public static void reset() {
started = failed = recovered = false;
}
@Override
public void recover() throws Exception {
if (!started) {
started = true;
} else if (!failed) {
failed = true;
throw new RuntimeException();
} else if (!recovered) {
recovered = true;
}
}
@Override
public void addCheckpoint(
CompletedCheckpoint checkpoint,
CheckpointsCleaner checkpointsCleaner,
Runnable postCleanup) {}
@Override
public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner)
throws Exception {}
@Override
public List<CompletedCheckpoint> getAllCheckpoints() {
return Collections.emptyList();
}
@Override
public int getNumberOfRetainedCheckpoints() {
return 0;
}
@Override
public int getMaxNumberOfRetainedCheckpoints() {
return 1;
}
@Override
public boolean requiresExternalizedCheckpoints() {
return false;
}
}
private SourceFunction<Integer> emitUntil(SerializableSupplier<Boolean> until) {
return new SourceFunction<Integer>() {
private volatile boolean running = true;
@Override
public void run(SourceContext<Integer> ctx) {
while (running && !until.get()) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(0);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
ExceptionUtils.rethrow(e);
}
}
}
}
@Override
public void cancel() {
running = false;
}
};
}
}