blob: 6fc751199097f0345b1dfc7e74e24bc5e048a80c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests for using KafkaSink writing to a Kafka cluster. */
public class KafkaSinkITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkITCase.class);
private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final int ZK_TIMEOUT_MILLIS = 30000;
private static final short TOPIC_REPLICATION_FACTOR = 1;
private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1);
private static final RecordSerializer serializer = new RecordSerializer();
private static AdminClient admin;
private String topic;
private SharedReference<AtomicLong> emittedRecordsCount;
private SharedReference<AtomicLong> emittedRecordsWithCheckpoint;
private SharedReference<AtomicBoolean> failed;
private SharedReference<AtomicLong> lastCheckpointedRecord;
@ClassRule
public static final KafkaContainer KAFKA_CONTAINER =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
.withEmbeddedZookeeper()
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
.withEnv(
"KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
String.valueOf(Duration.ofHours(2).toMillis()))
.withNetwork(NETWORK)
.withLogConsumer(LOG_CONSUMER)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
@Rule public final SharedObjects sharedObjects = SharedObjects.create();
@Rule public final TemporaryFolder temp = new TemporaryFolder();
@BeforeClass
public static void setupAdmin() {
Map<String, Object> properties = new HashMap<>();
properties.put(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
admin = AdminClient.create(properties);
}
@AfterClass
public static void teardownAdmin() {
admin.close();
}
@Before
public void setUp() throws ExecutionException, InterruptedException, TimeoutException {
emittedRecordsCount = sharedObjects.add(new AtomicLong());
emittedRecordsWithCheckpoint = sharedObjects.add(new AtomicLong());
failed = sharedObjects.add(new AtomicBoolean(false));
lastCheckpointedRecord = sharedObjects.add(new AtomicLong(0));
topic = UUID.randomUUID().toString();
createTestTopic(topic, 1, TOPIC_REPLICATION_FACTOR);
}
@After
public void tearDown() throws ExecutionException, InterruptedException, TimeoutException {
deleteTestTopic(topic);
}
@Test
public void testWriteRecordsToKafkaWithAtLeastOnceGuarantee() throws Exception {
writeRecordsToKafka(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount);
}
@Test
public void testWriteRecordsToKafkaWithNoneGuarantee() throws Exception {
writeRecordsToKafka(DeliveryGuarantee.NONE, emittedRecordsCount);
}
@Test
public void testWriteRecordsToKafkaWithExactlyOnceGuarantee() throws Exception {
writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, emittedRecordsWithCheckpoint);
}
@Test
public void testRecoveryWithAtLeastOnceGuarantee() throws Exception {
testRecoveryWithAssertion(
DeliveryGuarantee.AT_LEAST_ONCE,
1,
(records) ->
assertThat(records, hasItems(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)));
}
@Test
public void testRecoveryWithExactlyOnceGuarantee() throws Exception {
testRecoveryWithAssertion(
DeliveryGuarantee.EXACTLY_ONCE,
1,
(records) ->
assertThat(
records,
contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray())));
}
@Test
public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints() throws Exception {
testRecoveryWithAssertion(
DeliveryGuarantee.EXACTLY_ONCE,
2,
(records) ->
assertThat(
records,
contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray())));
}
@Test
public void testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Exception {
// Run a first job failing during the async phase of a checkpoint to leave some
// lingering transactions
final Configuration config = new Configuration();
config.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
final File checkpointDir = temp.newFolder();
config.setString(
CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.set(
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2);
try {
executeWithMapper(new FailAsyncCheckpointMapper(1), config, "firstPrefix");
} catch (Exception e) {
assertThat(
e.getCause().getCause().getMessage(),
containsString("Exceeded checkpoint tolerable failure"));
}
final File completedCheckpoint = TestUtils.getMostRecentCompletedCheckpoint(checkpointDir);
config.set(SavepointConfigOptions.SAVEPOINT_PATH, completedCheckpoint.toURI().toString());
// Run a second job which aborts all lingering transactions and new consumer should
// immediately see the newly written records
failed.get().set(true);
executeWithMapper(
new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, "newPrefix");
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic);
assertThat(
deserializeValues(collectedRecords),
contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray()));
}
@Test
public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exception {
// Run a first job opening 5 transactions one per subtask and fail in async checkpoint phase
final Configuration config = new Configuration();
config.set(CoreOptions.DEFAULT_PARALLELISM, 5);
try {
executeWithMapper(new FailAsyncCheckpointMapper(0), config, null);
} catch (Exception e) {
assertThat(
e.getCause().getCause().getMessage(),
containsString("Exceeded checkpoint tolerable failure"));
}
assertTrue(deserializeValues(drainAllRecordsFromTopic(topic)).isEmpty());
// Second job aborts all transactions from previous runs with higher parallelism
config.set(CoreOptions.DEFAULT_PARALLELISM, 1);
failed.get().set(true);
executeWithMapper(
new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, null);
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic);
assertThat(
deserializeValues(collectedRecords),
contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray()));
}
private void executeWithMapper(
MapFunction<Long, Long> mapper,
Configuration config,
@Nullable String transactionalIdPrefix)
throws Exception {
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
final StreamExecutionEnvironment env = new LocalStreamEnvironment(config);
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());
final DataStreamSource<Long> source = env.fromSequence(1, 10);
final DataStream<Long> stream = source.map(mapper);
final KafkaSinkBuilder<Long> builder =
new KafkaSinkBuilder<Long>()
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new RecordSerializer())
.build());
if (transactionalIdPrefix == null) {
transactionalIdPrefix = "kafka-sink";
}
builder.setTransactionalIdPrefix(transactionalIdPrefix);
stream.sinkTo(builder.build());
env.execute();
checkProducerLeak();
}
private void testRecoveryWithAssertion(
DeliveryGuarantee guarantee,
int maxConcurrentCheckpoints,
java.util.function.Consumer<List<Long>> recordsAssertion)
throws Exception {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
final StreamExecutionEnvironment env = new LocalStreamEnvironment(config);
env.enableCheckpointing(300L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints);
DataStreamSource<Long> source = env.fromSequence(1, 10);
DataStream<Long> stream =
source.map(new FailingCheckpointMapper(failed, lastCheckpointedRecord));
stream.sinkTo(
new KafkaSinkBuilder<Long>()
.setDeliverGuarantee(guarantee)
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new RecordSerializer())
.build())
.setTransactionalIdPrefix("kafka-sink")
.build());
env.execute();
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic);
recordsAssertion.accept(deserializeValues(collectedRecords));
checkProducerLeak();
}
private void writeRecordsToKafka(
DeliveryGuarantee deliveryGuarantee, SharedReference<AtomicLong> expectedRecords)
throws Exception {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
final StreamExecutionEnvironment env = new LocalStreamEnvironment(config);
env.enableCheckpointing(100L);
final DataStream<Long> source =
env.addSource(
new InfiniteIntegerSource(
emittedRecordsCount, emittedRecordsWithCheckpoint));
source.sinkTo(
new KafkaSinkBuilder<Long>()
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setDeliverGuarantee(deliveryGuarantee)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new RecordSerializer())
.build())
.setTransactionalIdPrefix("kafka-sink")
.build());
env.execute();
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic);
final long recordsCount = expectedRecords.get().get();
assertEquals(collectedRecords.size(), recordsCount);
assertThat(
deserializeValues(collectedRecords),
contains(LongStream.range(1, recordsCount + 1).boxed().toArray()));
checkProducerLeak();
}
private static List<Long> deserializeValues(List<ConsumerRecord<byte[], byte[]>> records) {
return records.stream()
.map(
record -> {
final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
final byte[] value = record.value();
buffer.put(value, 0, value.length);
buffer.flip();
return buffer.getLong();
})
.collect(Collectors.toList());
}
private static Properties getKafkaClientConfiguration() {
final Properties standardProps = new Properties();
standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put("group.id", UUID.randomUUID().toString());
standardProps.put("enable.auto.commit", false);
standardProps.put("auto.offset.reset", "earliest");
standardProps.put("max.partition.fetch.bytes", 256);
standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS);
standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS);
return standardProps;
}
private static Consumer<byte[], byte[]> createTestConsumer(
String topic, Properties properties) {
final Properties consumerConfig = new Properties();
consumerConfig.putAll(properties);
consumerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName());
consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
kafkaConsumer.subscribe(Collections.singletonList(topic));
return kafkaConsumer;
}
private void createTestTopic(String topic, int numPartitions, short replicationFactor)
throws ExecutionException, InterruptedException, TimeoutException {
final CreateTopicsResult result =
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, replicationFactor)));
result.all().get(1, TimeUnit.MINUTES);
}
private void deleteTestTopic(String topic)
throws ExecutionException, InterruptedException, TimeoutException {
final DeleteTopicsResult result = admin.deleteTopics(Collections.singletonList(topic));
result.all().get(1, TimeUnit.MINUTES);
}
private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String topic) {
Properties properties = getKafkaClientConfiguration();
return drainAllRecordsFromTopic(topic, properties);
}
static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
String topic, Properties properties) {
final List<ConsumerRecord<byte[], byte[]>> collectedRecords = new ArrayList<>();
try (Consumer<byte[], byte[]> consumer = createTestConsumer(topic, properties)) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_DURATION);
// Drain the kafka topic till all records are consumed
while (!records.isEmpty()) {
records.records(topic).forEach(collectedRecords::add);
records = consumer.poll(CONSUMER_POLL_DURATION);
}
}
return collectedRecords;
}
private static class RecordSerializer implements SerializationSchema<Long> {
@Override
public byte[] serialize(Long element) {
final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(element);
return buffer.array();
}
}
private static class FailAsyncCheckpointMapper
implements MapFunction<Long, Long>, CheckpointedFunction {
private static final ListStateDescriptor<Integer> stateDescriptor =
new ListStateDescriptor<>("test-state", new SlowSerializer());
private int failAfterCheckpoint;
private ListState<Integer> state;
public FailAsyncCheckpointMapper(int failAfterCheckpoint) {
this.failAfterCheckpoint = failAfterCheckpoint;
}
@Override
public Long map(Long value) throws Exception {
Thread.sleep(100);
return value;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
if (failAfterCheckpoint <= 0) {
// Trigger a failure in the serializer
state.add(-1);
} else {
state.add(1);
}
failAfterCheckpoint--;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(stateDescriptor);
}
}
private static class SlowSerializer extends TypeSerializerSingleton<Integer> {
@Override
public boolean isImmutableType() {
return false;
}
@Override
public Integer createInstance() {
return 1;
}
@Override
public Integer copy(Integer from) {
return from;
}
@Override
public Integer copy(Integer from, Integer reuse) {
return from;
}
@Override
public int getLength() {
return 0;
}
@Override
public void serialize(Integer record, DataOutputView target) throws IOException {
if (record != -1) {
return;
}
throw new RuntimeException("Expected failure during async checkpoint phase");
}
@Override
public Integer deserialize(DataInputView source) throws IOException {
return 1;
}
@Override
public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
return 1;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {}
@Override
public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
return new SlowSerializerSnapshot();
}
}
/** Snapshot used in {@link FailAsyncCheckpointMapper}. */
public static class SlowSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
public SlowSerializerSnapshot() {
super(SlowSerializer::new);
}
}
/** Fails after a checkpoint is taken and the next record was emitted. */
private static class FailingCheckpointMapper
implements MapFunction<Long, Long>, CheckpointListener, CheckpointedFunction {
private final SharedReference<AtomicBoolean> failed;
private final SharedReference<AtomicLong> lastCheckpointedRecord;
private volatile long lastSeenRecord;
private volatile long checkpointedRecord;
private volatile long lastCheckpointId = 0;
private final AtomicInteger emittedBetweenCheckpoint = new AtomicInteger(0);
FailingCheckpointMapper(
SharedReference<AtomicBoolean> failed,
SharedReference<AtomicLong> lastCheckpointedRecord) {
this.failed = failed;
this.lastCheckpointedRecord = lastCheckpointedRecord;
}
@Override
public Long map(Long value) throws Exception {
lastSeenRecord = value;
if (lastCheckpointId >= 1
&& emittedBetweenCheckpoint.get() > 0
&& !failed.get().get()) {
failed.get().set(true);
throw new RuntimeException("Planned exception.");
}
// Delay execution to ensure that at-least one checkpoint is triggered before finish
Thread.sleep(50);
emittedBetweenCheckpoint.incrementAndGet();
return value;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.info("notifyCheckpointComplete {} @ {}", checkpointedRecord, checkpointId);
lastCheckpointId = checkpointId;
emittedBetweenCheckpoint.set(0);
lastCheckpointedRecord.get().set(checkpointedRecord);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
LOG.info("snapshotState {} @ {}", lastSeenRecord, context.getCheckpointId());
checkpointedRecord = lastSeenRecord;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {}
}
private void checkProducerLeak() throws InterruptedException {
List<Map.Entry<Thread, StackTraceElement[]>> leaks = null;
for (int tries = 0; tries < 10; tries++) {
leaks =
Thread.getAllStackTraces().entrySet().stream()
.filter(this::findAliveKafkaThread)
.collect(Collectors.toList());
if (leaks.isEmpty()) {
return;
}
Thread.sleep(1000);
}
for (Map.Entry<Thread, StackTraceElement[]> leak : leaks) {
leak.getKey().stop();
}
fail(
"Detected producer leaks:\n"
+ leaks.stream().map(this::format).collect(Collectors.joining("\n\n")));
}
private String format(Map.Entry<Thread, StackTraceElement[]> leak) {
return leak.getKey().getName() + ":\n" + Joiner.on("\n").join(leak.getValue());
}
private boolean findAliveKafkaThread(Map.Entry<Thread, StackTraceElement[]> threadStackTrace) {
return threadStackTrace.getKey().getState() != Thread.State.TERMINATED
&& threadStackTrace.getKey().getName().contains("kafka-producer-network-thread");
}
/**
* Exposes information about how man records have been emitted overall and finishes after
* receiving the checkpoint completed event.
*/
private static final class InfiniteIntegerSource
implements SourceFunction<Long>, CheckpointListener, CheckpointedFunction {
private final SharedReference<AtomicLong> emittedRecordsCount;
private final SharedReference<AtomicLong> emittedRecordsWithCheckpoint;
private volatile boolean running = true;
private volatile long temp;
private Object lock;
InfiniteIntegerSource(
SharedReference<AtomicLong> emittedRecordsCount,
SharedReference<AtomicLong> emittedRecordsWithCheckpoint) {
this.emittedRecordsCount = emittedRecordsCount;
this.emittedRecordsWithCheckpoint = emittedRecordsWithCheckpoint;
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
lock = ctx.getCheckpointLock();
while (running) {
synchronized (lock) {
ctx.collect(emittedRecordsCount.get().addAndGet(1));
Thread.sleep(1);
}
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
emittedRecordsWithCheckpoint.get().set(temp);
running = false;
LOG.info("notifyCheckpointCompleted {}", checkpointId);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
temp = emittedRecordsCount.get().get();
LOG.info(
"snapshotState, {}, {}",
context.getCheckpointId(),
emittedRecordsCount.get().get());
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {}
}
}