blob: fe9bfb192bb80ec79acc32ecdc9bd07124ae93bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.checkpoint.kafka;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import kafka.common.TopicAlreadyMarkedForDeletionException;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointV1;
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.CheckpointV1Serde;
import org.apache.samza.serializers.CheckpointV2Serde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.kafka.KafkaStreamSpec;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.OngoingStubbing;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class TestKafkaCheckpointManager {
private static final TaskName TASK0 = new TaskName("Partition 0");
private static final TaskName TASK1 = new TaskName("Partition 1");
private static final String CHECKPOINT_TOPIC = "checkpointTopic";
private static final String CHECKPOINT_SYSTEM = "checkpointSystem";
private static final SystemStreamPartition CHECKPOINT_SSP =
new SystemStreamPartition(CHECKPOINT_SYSTEM, CHECKPOINT_TOPIC, new Partition(0));
private static final SystemStreamPartition INPUT_SSP0 =
new SystemStreamPartition("inputSystem", "inputTopic", new Partition(0));
private static final SystemStreamPartition INPUT_SSP1 =
new SystemStreamPartition("inputSystem", "inputTopic", new Partition(1));
private static final String GROUPER_FACTORY_CLASS = GroupByPartitionFactory.class.getCanonicalName();
private static final KafkaStreamSpec CHECKPOINT_SPEC =
new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC, CHECKPOINT_SYSTEM, 1);
private static final CheckpointV1Serde CHECKPOINT_V1_SERDE = new CheckpointV1Serde();
private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
private static final KafkaCheckpointLogKeySerde KAFKA_CHECKPOINT_LOG_KEY_SERDE = new KafkaCheckpointLogKeySerde();
@Mock
private SystemProducer systemProducer;
@Mock
private SystemConsumer systemConsumer;
@Mock
private SystemAdmin systemAdmin;
@Mock
private SystemAdmin createResourcesSystemAdmin;
@Mock
private SystemFactory systemFactory;
@Mock
private MetricsRegistry metricsRegistry;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test(expected = TopicAlreadyMarkedForDeletionException.class)
public void testCreateResourcesTopicCreationError() {
setupSystemFactory(config());
// throw an exception during createStream
doThrow(new TopicAlreadyMarkedForDeletionException("invalid stream")).when(this.createResourcesSystemAdmin)
.createStream(CHECKPOINT_SPEC);
KafkaCheckpointManager checkpointManager = buildKafkaCheckpointManager(true, config());
// expect an exception during startup
checkpointManager.createResources();
}
@Test(expected = StreamValidationException.class)
public void testCreateResourcesTopicValidationError() {
setupSystemFactory(config());
// throw an exception during validateStream
doThrow(new StreamValidationException("invalid stream")).when(this.createResourcesSystemAdmin)
.validateStream(CHECKPOINT_SPEC);
KafkaCheckpointManager checkpointManager = buildKafkaCheckpointManager(true, config());
// expect an exception during startup
checkpointManager.createResources();
}
@Test(expected = SamzaException.class)
public void testReadFailsOnSerdeExceptions() throws InterruptedException {
setupSystemFactory(config());
List<IncomingMessageEnvelope> checkpointEnvelopes =
ImmutableList.of(newCheckpointV1Envelope(TASK0, buildCheckpointV1(INPUT_SSP0, "0"), "0"));
setupConsumer(checkpointEnvelopes);
// wire up an exception throwing serde with the checkpointManager
CheckpointV1Serde checkpointV1Serde = mock(CheckpointV1Serde.class);
doThrow(new RuntimeException("serde failed")).when(checkpointV1Serde).fromBytes(any());
KafkaCheckpointManager checkpointManager =
new KafkaCheckpointManager(CHECKPOINT_SPEC, this.systemFactory, true, config(), this.metricsRegistry,
checkpointV1Serde, CHECKPOINT_V2_SERDE, KAFKA_CHECKPOINT_LOG_KEY_SERDE);
checkpointManager.register(TASK0);
// expect an exception
checkpointManager.readLastCheckpoint(TASK0);
}
@Test
public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws InterruptedException {
setupSystemFactory(config());
List<IncomingMessageEnvelope> checkpointEnvelopes =
ImmutableList.of(newCheckpointV1Envelope(TASK0, buildCheckpointV1(INPUT_SSP0, "0"), "0"));
setupConsumer(checkpointEnvelopes);
// wire up an exception throwing serde with the checkpointManager
CheckpointV1Serde checkpointV1Serde = mock(CheckpointV1Serde.class);
doThrow(new RuntimeException("serde failed")).when(checkpointV1Serde).fromBytes(any());
KafkaCheckpointManager checkpointManager =
new KafkaCheckpointManager(CHECKPOINT_SPEC, this.systemFactory, false, config(), this.metricsRegistry,
checkpointV1Serde, CHECKPOINT_V2_SERDE, KAFKA_CHECKPOINT_LOG_KEY_SERDE);
checkpointManager.register(TASK0);
// expect the read to succeed in spite of the exception from ExceptionThrowingSerde
assertNull(checkpointManager.readLastCheckpoint(TASK0));
}
@Test
public void testStart() {
setupSystemFactory(config());
String oldestOffset = "1";
String newestOffset = "2";
SystemStreamMetadata checkpointTopicMetadata = new SystemStreamMetadata(CHECKPOINT_TOPIC,
ImmutableMap.of(new Partition(0), new SystemStreamPartitionMetadata(oldestOffset, newestOffset,
Integer.toString(Integer.parseInt(newestOffset) + 1))));
when(this.systemAdmin.getSystemStreamMetadata(Collections.singleton(CHECKPOINT_TOPIC))).thenReturn(
ImmutableMap.of(CHECKPOINT_TOPIC, checkpointTopicMetadata));
KafkaCheckpointManager checkpointManager = buildKafkaCheckpointManager(true, config());
checkpointManager.start();
verify(this.systemProducer).start();
verify(this.systemAdmin).start();
verify(this.systemConsumer).register(CHECKPOINT_SSP, oldestOffset);
verify(this.systemConsumer).start();
}
@Test
public void testRegister() {
setupSystemFactory(config());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
verify(this.systemProducer).register(TASK0.getTaskName());
}
@Test
public void testStop() {
setupSystemFactory(config());
KafkaCheckpointManager checkpointManager = buildKafkaCheckpointManager(true, config());
checkpointManager.stop();
verify(this.systemProducer).stop();
// default configuration for stopConsumerAfterFirstRead means that consumer is not stopped here
verify(this.systemConsumer, never()).stop();
verify(this.systemAdmin).stop();
}
@Test
public void testWriteCheckpointShouldRecreateSystemProducerOnFailure() {
setupSystemFactory(config());
SystemProducer secondKafkaProducer = mock(SystemProducer.class);
// override default mock behavior to return a second producer on the second call to create a producer
when(this.systemFactory.getProducer(CHECKPOINT_SYSTEM, config(), this.metricsRegistry,
KafkaCheckpointManager.class.getSimpleName())).thenReturn(this.systemProducer, secondKafkaProducer);
// first producer throws an exception on flush
doThrow(new RuntimeException("flush failed")).when(this.systemProducer).flush(TASK0.getTaskName());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
CheckpointV1 checkpointV1 = buildCheckpointV1(INPUT_SSP0, "0");
kafkaCheckpointManager.writeCheckpoint(TASK0, checkpointV1);
// first producer should be stopped
verify(this.systemProducer).stop();
// register and start the second producer
verify(secondKafkaProducer).register(TASK0.getTaskName());
verify(secondKafkaProducer).start();
// check that the second producer was given the message to send out
ArgumentCaptor<OutgoingMessageEnvelope> outgoingMessageEnvelopeArgumentCaptor =
ArgumentCaptor.forClass(OutgoingMessageEnvelope.class);
verify(secondKafkaProducer).send(eq(TASK0.getTaskName()), outgoingMessageEnvelopeArgumentCaptor.capture());
assertEquals(CHECKPOINT_SSP, outgoingMessageEnvelopeArgumentCaptor.getValue().getSystemStream());
assertEquals(new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE, TASK0, GROUPER_FACTORY_CLASS),
KAFKA_CHECKPOINT_LOG_KEY_SERDE.fromBytes((byte[]) outgoingMessageEnvelopeArgumentCaptor.getValue().getKey()));
assertEquals(checkpointV1,
CHECKPOINT_V1_SERDE.fromBytes((byte[]) outgoingMessageEnvelopeArgumentCaptor.getValue().getMessage()));
verify(secondKafkaProducer).flush(TASK0.getTaskName());
}
@Test
public void testCreateResources() {
setupSystemFactory(config());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.createResources();
verify(this.createResourcesSystemAdmin).start();
verify(this.createResourcesSystemAdmin).createStream(CHECKPOINT_SPEC);
verify(this.createResourcesSystemAdmin).validateStream(CHECKPOINT_SPEC);
verify(this.createResourcesSystemAdmin).stop();
}
@Test
public void testCreateResourcesSkipValidation() {
setupSystemFactory(config());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(false, config());
kafkaCheckpointManager.createResources();
verify(this.createResourcesSystemAdmin).start();
verify(this.createResourcesSystemAdmin).createStream(CHECKPOINT_SPEC);
verify(this.createResourcesSystemAdmin, never()).validateStream(CHECKPOINT_SPEC);
verify(this.createResourcesSystemAdmin).stop();
}
@Test
public void testReadEmpty() throws InterruptedException {
setupSystemFactory(config());
setupConsumer(ImmutableList.of());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
assertNull(kafkaCheckpointManager.readLastCheckpoint(TASK0));
}
@Test
public void testReadCheckpointV1() throws InterruptedException {
setupSystemFactory(config());
CheckpointV1 checkpointV1 = buildCheckpointV1(INPUT_SSP0, "0");
List<IncomingMessageEnvelope> checkpointEnvelopes =
ImmutableList.of(newCheckpointV1Envelope(TASK0, checkpointV1, "0"));
setupConsumer(checkpointEnvelopes);
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
Checkpoint actualCheckpoint = kafkaCheckpointManager.readLastCheckpoint(TASK0);
assertEquals(checkpointV1, actualCheckpoint);
}
@Test
public void testReadIgnoreCheckpointV2WhenV1Enabled() throws InterruptedException {
setupSystemFactory(config());
CheckpointV1 checkpointV1 = buildCheckpointV1(INPUT_SSP0, "0");
List<IncomingMessageEnvelope> checkpointEnvelopes =
ImmutableList.of(newCheckpointV1Envelope(TASK0, checkpointV1, "0"),
newCheckpointV2Envelope(TASK0, buildCheckpointV2(INPUT_SSP0, "1"), "1"));
setupConsumer(checkpointEnvelopes);
// default is to only read CheckpointV1
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
Checkpoint actualCheckpoint = kafkaCheckpointManager.readLastCheckpoint(TASK0);
assertEquals(checkpointV1, actualCheckpoint);
}
@Test
public void testReadCheckpointV2() throws InterruptedException {
Config config = config(ImmutableMap.of(TaskConfig.CHECKPOINT_READ_VERSIONS, "1,2"));
setupSystemFactory(config);
CheckpointV2 checkpointV2 = buildCheckpointV2(INPUT_SSP0, "0");
List<IncomingMessageEnvelope> checkpointEnvelopes =
ImmutableList.of(newCheckpointV2Envelope(TASK0, checkpointV2, "0"));
setupConsumer(checkpointEnvelopes);
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config);
kafkaCheckpointManager.register(TASK0);
Checkpoint actualCheckpoint = kafkaCheckpointManager.readLastCheckpoint(TASK0);
assertEquals(checkpointV2, actualCheckpoint);
}
@Test
public void testReadCheckpointPriority() throws InterruptedException {
Config config = config(ImmutableMap.of(TaskConfig.CHECKPOINT_READ_VERSIONS, "2,1"));
setupSystemFactory(config);
CheckpointV2 checkpointV2 = buildCheckpointV2(INPUT_SSP0, "1");
List<IncomingMessageEnvelope> checkpointEnvelopes =
ImmutableList.of(newCheckpointV1Envelope(TASK0, buildCheckpointV1(INPUT_SSP0, "0"), "0"),
newCheckpointV2Envelope(TASK0, checkpointV2, "1"));
setupConsumer(checkpointEnvelopes);
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config);
kafkaCheckpointManager.register(TASK0);
Checkpoint actualCheckpoint = kafkaCheckpointManager.readLastCheckpoint(TASK0);
assertEquals(checkpointV2, actualCheckpoint);
}
@Test
public void testReadMultipleCheckpointsMultipleSSP() throws InterruptedException {
setupSystemFactory(config());
KafkaCheckpointManager checkpointManager = buildKafkaCheckpointManager(true, config());
checkpointManager.register(TASK0);
checkpointManager.register(TASK1);
// mock out a consumer that returns 5 checkpoint IMEs for each SSP
int newestOffset = 5;
int checkpointOffsetCounter = 0;
List<List<IncomingMessageEnvelope>> pollOutputs = new ArrayList<>();
for (int offset = 1; offset <= newestOffset; offset++) {
pollOutputs.add(ImmutableList.of(
// use regular offset value for INPUT_SSP0
newCheckpointV1Envelope(TASK0, buildCheckpointV1(INPUT_SSP0, Integer.toString(offset)),
Integer.toString(checkpointOffsetCounter++)),
// use (offset * 2) value for INPUT_SSP1 so offsets are different from INPUT_SSP0
newCheckpointV1Envelope(TASK1, buildCheckpointV1(INPUT_SSP1, Integer.toString(offset * 2)),
Integer.toString(checkpointOffsetCounter++))));
}
setupConsumerMultiplePoll(pollOutputs);
assertEquals(buildCheckpointV1(INPUT_SSP0, Integer.toString(newestOffset)),
checkpointManager.readLastCheckpoint(TASK0));
assertEquals(buildCheckpointV1(INPUT_SSP1, Integer.toString(newestOffset * 2)),
checkpointManager.readLastCheckpoint(TASK1));
// check expected number of polls (+1 is for the final empty poll), and the checkpoint is the newest message
verify(this.systemConsumer, times(newestOffset + 1)).poll(ImmutableSet.of(CHECKPOINT_SSP),
SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
}
@Test
public void testReadMultipleCheckpointsUpgradeCheckpointVersion() throws InterruptedException {
Config config = config(ImmutableMap.of(TaskConfig.CHECKPOINT_READ_VERSIONS, "2,1"));
setupSystemFactory(config);
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config);
kafkaCheckpointManager.register(TASK0);
kafkaCheckpointManager.register(TASK1);
List<IncomingMessageEnvelope> checkpointEnvelopesV1 =
ImmutableList.of(newCheckpointV1Envelope(TASK0, buildCheckpointV1(INPUT_SSP0, "0"), "0"),
newCheckpointV1Envelope(TASK1, buildCheckpointV1(INPUT_SSP1, "0"), "1"));
CheckpointV2 ssp0CheckpointV2 = buildCheckpointV2(INPUT_SSP0, "10");
CheckpointV2 ssp1CheckpointV2 = buildCheckpointV2(INPUT_SSP1, "11");
List<IncomingMessageEnvelope> checkpointEnvelopesV2 =
ImmutableList.of(newCheckpointV2Envelope(TASK0, ssp0CheckpointV2, "2"),
newCheckpointV2Envelope(TASK1, ssp1CheckpointV2, "3"));
setupConsumerMultiplePoll(ImmutableList.of(checkpointEnvelopesV1, checkpointEnvelopesV2));
assertEquals(ssp0CheckpointV2, kafkaCheckpointManager.readLastCheckpoint(TASK0));
assertEquals(ssp1CheckpointV2, kafkaCheckpointManager.readLastCheckpoint(TASK1));
// 2 polls for actual checkpoints, 1 final empty poll
verify(this.systemConsumer, times(3)).poll(ImmutableSet.of(CHECKPOINT_SSP),
SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
}
@Test
public void testWriteCheckpointV1() {
setupSystemFactory(config());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
CheckpointV1 checkpointV1 = buildCheckpointV1(INPUT_SSP0, "0");
kafkaCheckpointManager.writeCheckpoint(TASK0, checkpointV1);
ArgumentCaptor<OutgoingMessageEnvelope> outgoingMessageEnvelopeArgumentCaptor =
ArgumentCaptor.forClass(OutgoingMessageEnvelope.class);
verify(this.systemProducer).send(eq(TASK0.getTaskName()), outgoingMessageEnvelopeArgumentCaptor.capture());
assertEquals(CHECKPOINT_SSP, outgoingMessageEnvelopeArgumentCaptor.getValue().getSystemStream());
assertEquals(new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE, TASK0, GROUPER_FACTORY_CLASS),
KAFKA_CHECKPOINT_LOG_KEY_SERDE.fromBytes((byte[]) outgoingMessageEnvelopeArgumentCaptor.getValue().getKey()));
assertEquals(checkpointV1,
CHECKPOINT_V1_SERDE.fromBytes((byte[]) outgoingMessageEnvelopeArgumentCaptor.getValue().getMessage()));
verify(this.systemProducer).flush(TASK0.getTaskName());
}
@Test
public void testWriteCheckpointV2() {
setupSystemFactory(config());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
CheckpointV2 checkpointV2 = buildCheckpointV2(INPUT_SSP0, "0");
kafkaCheckpointManager.writeCheckpoint(TASK0, checkpointV2);
ArgumentCaptor<OutgoingMessageEnvelope> outgoingMessageEnvelopeArgumentCaptor =
ArgumentCaptor.forClass(OutgoingMessageEnvelope.class);
verify(this.systemProducer).send(eq(TASK0.getTaskName()), outgoingMessageEnvelopeArgumentCaptor.capture());
assertEquals(CHECKPOINT_SSP, outgoingMessageEnvelopeArgumentCaptor.getValue().getSystemStream());
assertEquals(new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE, TASK0, GROUPER_FACTORY_CLASS),
KAFKA_CHECKPOINT_LOG_KEY_SERDE.fromBytes((byte[]) outgoingMessageEnvelopeArgumentCaptor.getValue().getKey()));
assertEquals(checkpointV2,
CHECKPOINT_V2_SERDE.fromBytes((byte[]) outgoingMessageEnvelopeArgumentCaptor.getValue().getMessage()));
verify(this.systemProducer).flush(TASK0.getTaskName());
}
@Test
public void testWriteCheckpointShouldRetryFiniteTimesOnFailure() {
setupSystemFactory(config());
doThrow(new RuntimeException("send failed")).when(this.systemProducer).send(any(), any());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
kafkaCheckpointManager.MaxRetryDurationInMillis_$eq(100); // setter for scala var MaxRetryDurationInMillis
CheckpointV2 checkpointV2 = buildCheckpointV2(INPUT_SSP0, "0");
try {
kafkaCheckpointManager.writeCheckpoint(TASK0, checkpointV2);
fail("Expected to throw SamzaException");
} catch (SamzaException e) {
// expected to get here
}
// one call to send which fails, then writeCheckpoint gives up
verify(this.systemProducer).send(any(), any());
verify(this.systemProducer, never()).flush(any());
}
@Test
public void testConsumerStopsAfterInitialRead() throws Exception {
setupSystemFactory(config());
CheckpointV1 checkpointV1 = buildCheckpointV1(INPUT_SSP0, "0");
setupConsumer(ImmutableList.of(newCheckpointV1Envelope(TASK0, checkpointV1, "0")));
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());
kafkaCheckpointManager.register(TASK0);
assertEquals(checkpointV1, kafkaCheckpointManager.readLastCheckpoint(TASK0));
// 1 call to get actual checkpoints, 1 call for empty poll to signal done reading
verify(this.systemConsumer, times(2)).poll(ImmutableSet.of(CHECKPOINT_SSP), SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
verify(this.systemConsumer).stop();
// reading checkpoint again should not read more messages from the consumer
assertEquals(checkpointV1, kafkaCheckpointManager.readLastCheckpoint(TASK0));
verifyNoMoreInteractions(this.systemConsumer);
}
@Test
public void testConsumerStopsAfterInitialReadDisabled() throws Exception {
Config config =
config(ImmutableMap.of(TaskConfig.INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ, "false"));
setupSystemFactory(config);
// 1) return checkpointV1 for INPUT_SSP
CheckpointV1 ssp0FirstCheckpointV1 = buildCheckpointV1(INPUT_SSP0, "0");
List<IncomingMessageEnvelope> checkpointEnvelopes0 =
ImmutableList.of(newCheckpointV1Envelope(TASK0, buildCheckpointV1(INPUT_SSP0, "0"), "0"));
setupConsumer(checkpointEnvelopes0);
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config);
kafkaCheckpointManager.register(TASK0);
assertEquals(ssp0FirstCheckpointV1, kafkaCheckpointManager.readLastCheckpoint(TASK0));
// 2) return new checkpointV1 for just INPUT_SSP
CheckpointV1 ssp0SecondCheckpointV1 = buildCheckpointV1(INPUT_SSP0, "10");
List<IncomingMessageEnvelope> checkpointEnvelopes1 =
ImmutableList.of(newCheckpointV1Envelope(TASK0, ssp0SecondCheckpointV1, "1"));
setupConsumer(checkpointEnvelopes1);
assertEquals(ssp0SecondCheckpointV1, kafkaCheckpointManager.readLastCheckpoint(TASK0));
verify(this.systemConsumer, never()).stop();
}
private KafkaCheckpointManager buildKafkaCheckpointManager(boolean validateCheckpoint, Config config) {
return new KafkaCheckpointManager(CHECKPOINT_SPEC, this.systemFactory, validateCheckpoint, config,
this.metricsRegistry, CHECKPOINT_V1_SERDE, CHECKPOINT_V2_SERDE, KAFKA_CHECKPOINT_LOG_KEY_SERDE);
}
private void setupConsumer(List<IncomingMessageEnvelope> pollOutput) throws InterruptedException {
setupConsumerMultiplePoll(ImmutableList.of(pollOutput));
}
/**
* Create a new {@link SystemConsumer} that returns a list of messages sequentially at each subsequent poll.
*
* @param pollOutputs a list of poll outputs to be returned at subsequent polls.
* The i'th call to consumer.poll() will return the list at pollOutputs[i]
*/
private void setupConsumerMultiplePoll(List<List<IncomingMessageEnvelope>> pollOutputs) throws InterruptedException {
OngoingStubbing<Map<SystemStreamPartition, List<IncomingMessageEnvelope>>> when =
when(this.systemConsumer.poll(ImmutableSet.of(CHECKPOINT_SSP), SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES));
for (List<IncomingMessageEnvelope> pollOutput : pollOutputs) {
when = when.thenReturn(ImmutableMap.of(CHECKPOINT_SSP, pollOutput));
}
when.thenReturn(ImmutableMap.of());
}
private void setupSystemFactory(Config config) {
when(this.systemFactory.getProducer(CHECKPOINT_SYSTEM, config, this.metricsRegistry,
KafkaCheckpointManager.class.getSimpleName())).thenReturn(this.systemProducer);
when(this.systemFactory.getConsumer(CHECKPOINT_SYSTEM, config, this.metricsRegistry,
KafkaCheckpointManager.class.getSimpleName())).thenReturn(this.systemConsumer);
when(this.systemFactory.getAdmin(CHECKPOINT_SYSTEM, config,
KafkaCheckpointManager.class.getSimpleName())).thenReturn(this.systemAdmin);
when(this.systemFactory.getAdmin(CHECKPOINT_SYSTEM, config,
KafkaCheckpointManager.class.getSimpleName() + "createResource")).thenReturn(this.createResourcesSystemAdmin);
}
private static CheckpointV1 buildCheckpointV1(SystemStreamPartition ssp, String offset) {
return new CheckpointV1(ImmutableMap.of(ssp, offset));
}
/**
* Creates a new checkpoint envelope for the provided task, ssp and offset
*/
private IncomingMessageEnvelope newCheckpointV1Envelope(TaskName taskName, CheckpointV1 checkpointV1,
String checkpointMessageOffset) {
KafkaCheckpointLogKey checkpointKey = new KafkaCheckpointLogKey("checkpoint", taskName, GROUPER_FACTORY_CLASS);
KafkaCheckpointLogKeySerde checkpointKeySerde = new KafkaCheckpointLogKeySerde();
CheckpointV1Serde checkpointMsgSerde = new CheckpointV1Serde();
return new IncomingMessageEnvelope(CHECKPOINT_SSP, checkpointMessageOffset,
checkpointKeySerde.toBytes(checkpointKey), checkpointMsgSerde.toBytes(checkpointV1));
}
private static CheckpointV2 buildCheckpointV2(SystemStreamPartition ssp, String offset) {
return new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, offset),
ImmutableMap.of("backend", ImmutableMap.of("store", "10")));
}
private IncomingMessageEnvelope newCheckpointV2Envelope(TaskName taskName, CheckpointV2 checkpointV2,
String checkpointMessageOffset) {
KafkaCheckpointLogKey checkpointKey =
new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE, taskName, GROUPER_FACTORY_CLASS);
KafkaCheckpointLogKeySerde checkpointKeySerde = new KafkaCheckpointLogKeySerde();
CheckpointV2Serde checkpointMsgSerde = new CheckpointV2Serde();
return new IncomingMessageEnvelope(CHECKPOINT_SSP, checkpointMessageOffset,
checkpointKeySerde.toBytes(checkpointKey), checkpointMsgSerde.toBytes(checkpointV2));
}
/**
* Build base {@link Config} for tests.
*/
private static Config config() {
return new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY, GROUPER_FACTORY_CLASS));
}
private static Config config(Map<String, String> additional) {
Map<String, String> configMap = new HashMap<>(config());
configMap.putAll(additional);
return new MapConfig(configMap);
}
}