blob: 3db1dd384a9129d7decedbde0040fd022fbcd830 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.testcontainers.utility.DockerImageName;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
public class FlinkKinesisITCase extends TestLogger {
public static final String TEST_STREAM = "test_stream";
@ClassRule
public static MiniClusterWithClientResource miniCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder().build());
@ClassRule
public static KinesaliteContainer kinesalite =
new KinesaliteContainer(
DockerImageName.parse("instructure/kinesalite").withTag("latest"));
@Rule public TemporaryFolder temp = new TemporaryFolder();
private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
private KinesisPubsubClient client;
@Before
public void setupClient() {
client = new KinesisPubsubClient(kinesalite.getContainerProperties());
}
/**
* Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
*
* <ol>
* <li>The test setups up a stream with 100 records and creates a Flink job that reads them
* with very slowly (using up a large chunk of time of the mailbox).
* <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
* <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
* cancel.
* <li>With the fix, the job proceeds and we can lift the backpressure.
* </ol>
*/
@Test
public void testStopWithSavepoint() throws Exception {
client.createTopic(TEST_STREAM, 1, new Properties());
// add elements to the test stream
int numElements = 10;
List<String> elements =
IntStream.range(0, numElements)
.mapToObj(String::valueOf)
.collect(Collectors.toList());
for (String element : elements) {
client.sendMessage(TEST_STREAM, element);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties config = kinesalite.getContainerProperties();
config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
FlinkKinesisConsumer<String> consumer =
new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
// call stop with savepoint in another thread
ForkJoinTask<Object> stopTask =
ForkJoinPool.commonPool()
.submit(
() -> {
WaitingMapper.firstElement.await();
stopWithSavepoint();
WaitingMapper.stopped = true;
return null;
});
try {
List<String> result =
env.addSource(consumer).map(new WaitingMapper()).executeAndCollect(10000);
// stop with savepoint will most likely only return a small subset of the elements
// validate that the prefix is as expected
assertThat(result, hasSize(lessThan(numElements)));
assertThat(result, equalTo(elements.subList(0, result.size())));
} finally {
stopTask.cancel(true);
}
}
private String stopWithSavepoint() throws Exception {
JobStatusMessage job =
miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
return miniCluster
.getClusterClient()
.stopWithSavepoint(job.getJobId(), true, temp.getRoot().getAbsolutePath())
.get();
}
private static class WaitingMapper implements MapFunction<String, String> {
static CountDownLatch firstElement = new CountDownLatch(1);
static volatile boolean stopped = false;
@Override
public String map(String value) throws Exception {
if (firstElement.getCount() > 0) {
firstElement.countDown();
}
if (!stopped) {
Thread.sleep(100);
}
return value;
}
}
}