blob: f32040a6bf6acf726acbc4a51e5eed4c9aa64a50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.checkpoint.kafka;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import kafka.common.KafkaException;
import kafka.common.TopicAlreadyMarkedForDeletionException;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.CheckpointSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.kafka.KafkaStreamSpec;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.stubbing.OngoingStubbing;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.*;
public class TestKafkaCheckpointManagerJava {
private static final TaskName TASK1 = new TaskName("task1");
private static final String CHECKPOINT_TOPIC = "topic-1";
private static final String CHECKPOINT_SYSTEM = "system-1";
private static final Partition CHECKPOINT_PARTITION = new Partition(0);
private static final SystemStreamPartition CHECKPOINT_SSP =
new SystemStreamPartition(CHECKPOINT_SYSTEM, CHECKPOINT_TOPIC, CHECKPOINT_PARTITION);
private static final String GROUPER_FACTORY_CLASS = GroupByPartitionFactory.class.getCanonicalName();
@Test(expected = TopicAlreadyMarkedForDeletionException.class)
public void testStartFailsOnTopicCreationErrors() {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
// create an admin that throws an exception during createStream
SystemAdmin mockAdmin = newAdmin("0", "10");
doThrow(new TopicAlreadyMarkedForDeletionException("invalid stream")).when(mockAdmin).createStream(checkpointSpec);
SystemFactory factory = newFactory(mock(SystemProducer.class), mock(SystemConsumer.class), mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());
// expect an exception during startup
checkpointManager.createResources();
checkpointManager.start();
}
@Test(expected = StreamValidationException.class)
public void testStartFailsOnTopicValidationErrors() {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
// create an admin that throws an exception during validateStream
SystemAdmin mockAdmin = newAdmin("0", "10");
doThrow(new StreamValidationException("invalid stream")).when(mockAdmin).validateStream(checkpointSpec);
SystemFactory factory = newFactory(mock(SystemProducer.class), mock(SystemConsumer.class), mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());
// expect an exception during startup
checkpointManager.createResources();
checkpointManager.start();
}
@Test(expected = SamzaException.class)
public void testReadFailsOnSerdeExceptions() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY)).thenReturn(GROUPER_FACTORY_CLASS);
// mock out a consumer that returns a single checkpoint IME
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
SystemAdmin mockAdmin = newAdmin("0", "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
// wire up an exception throwing serde with the checkpointmanager
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
// expect an exception from ExceptionThrowingSerde
checkpointManager.readLastCheckpoint(TASK1);
}
@Test
public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY)).thenReturn(GROUPER_FACTORY_CLASS);
// mock out a consumer that returns a single checkpoint IME
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
SystemAdmin mockAdmin = newAdmin("0", "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
// wire up an exception throwing serde with the checkpointmanager
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
false, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(),
new ExceptionThrowingCheckpointKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
// expect the read to succeed inspite of the exception from ExceptionThrowingSerde
checkpointManager.readLastCheckpoint(TASK1);
}
@Test
public void testCheckpointsAreReadFromOldestOffset() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY)).thenReturn(GROUPER_FACTORY_CLASS);
// mock out a consumer that returns a single checkpoint IME
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
SystemConsumer mockConsumer = newConsumer(ImmutableList.of(
ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0"))));
String oldestOffset = "0";
SystemAdmin mockAdmin = newAdmin(oldestOffset, "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
// 1. verify that consumer.register is called only during checkpointManager.start.
// 2. verify that consumer.register is called with the oldest offset.
// 3. verify that no other operation on the CheckpointManager re-invokes register since start offsets are set during
// register
verify(mockConsumer, times(0)).register(CHECKPOINT_SSP, oldestOffset);
checkpointManager.start();
verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);
checkpointManager.readLastCheckpoint(TASK1);
verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);
}
@Test
public void testAllMessagesInTheLogAreRead() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY)).thenReturn(GROUPER_FACTORY_CLASS);
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
int oldestOffset = 0;
int newestOffset = 10;
// mock out a consumer that returns ten checkpoint IMEs for the same ssp
List<List<IncomingMessageEnvelope>> pollOutputs = new ArrayList<>();
for (int offset = oldestOffset; offset <= newestOffset; offset++) {
pollOutputs.add(ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, Integer.toString(offset))));
}
// return one message at a time from each poll simulating a KafkaConsumer with max.poll.records = 1
SystemConsumer mockConsumer = newConsumer(pollOutputs);
SystemAdmin mockAdmin = newAdmin(Integer.toString(oldestOffset), Integer.toString(newestOffset));
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
// check that all ten messages are read, and the checkpoint is the newest message
Checkpoint checkpoint = checkpointManager.readLastCheckpoint(TASK1);
Assert.assertEquals(checkpoint.getOffsets(), ImmutableMap.of(ssp, Integer.toString(newestOffset)));
}
/**
* Create a new {@link SystemConsumer} that returns a list of messages sequentially at each subsequent poll.
*
* @param pollOutputs a list of poll outputs to be returned at subsequent polls.
* The i'th call to consumer.poll() will return the list at pollOutputs[i]
* @return the consumer
*/
private SystemConsumer newConsumer(List<List<IncomingMessageEnvelope>> pollOutputs) throws Exception {
SystemConsumer mockConsumer = mock(SystemConsumer.class);
OngoingStubbing<Map> when = when(mockConsumer.poll(anySet(), anyLong()));
for (List<IncomingMessageEnvelope> pollOutput : pollOutputs) {
when = when.thenReturn(ImmutableMap.of(CHECKPOINT_SSP, pollOutput));
}
when.thenReturn(ImmutableMap.of());
return mockConsumer;
}
/**
* Create a new {@link SystemAdmin} that returns the provided oldest and newest offsets for its topics
*/
private SystemAdmin newAdmin(String oldestOffset, String newestOffset) {
SystemStreamMetadata checkpointTopicMetadata = new SystemStreamMetadata(CHECKPOINT_TOPIC,
ImmutableMap.of(new Partition(0), new SystemStreamPartitionMetadata(oldestOffset,
newestOffset, Integer.toString(Integer.parseInt(newestOffset) + 1))));
SystemAdmin mockAdmin = mock(SystemAdmin.class);
when(mockAdmin.getSystemStreamMetadata(Collections.singleton(CHECKPOINT_TOPIC))).thenReturn(
ImmutableMap.of(CHECKPOINT_TOPIC, checkpointTopicMetadata));
return mockAdmin;
}
private SystemFactory newFactory(SystemProducer producer, SystemConsumer consumer, SystemAdmin admin) {
SystemFactory factory = mock(SystemFactory.class);
when(factory.getProducer(anyString(), any(Config.class), any(MetricsRegistry.class), anyString())).thenReturn(producer);
when(factory.getConsumer(anyString(), any(Config.class), any(MetricsRegistry.class), anyString())).thenReturn(consumer);
when(factory.getAdmin(anyString(), any(Config.class), anyString())).thenReturn(admin);
return factory;
}
/**
* Creates a new checkpoint envelope for the provided task, ssp and offset
*/
private IncomingMessageEnvelope newCheckpointEnvelope(TaskName taskName, SystemStreamPartition ssp, String offset) {
KafkaCheckpointLogKey checkpointKey =
new KafkaCheckpointLogKey("checkpoint", taskName, GROUPER_FACTORY_CLASS);
KafkaCheckpointLogKeySerde checkpointKeySerde = new KafkaCheckpointLogKeySerde();
Checkpoint checkpointMsg = new Checkpoint(ImmutableMap.of(ssp, offset));
CheckpointSerde checkpointMsgSerde = new CheckpointSerde();
return new IncomingMessageEnvelope(CHECKPOINT_SSP, offset, checkpointKeySerde.toBytes(checkpointKey),
checkpointMsgSerde.toBytes(checkpointMsg));
}
private static class ExceptionThrowingCheckpointSerde extends CheckpointSerde {
public Checkpoint fromBytes(byte[] bytes) {
throw new KafkaException("exception");
}
}
private static class ExceptionThrowingCheckpointKeySerde extends KafkaCheckpointLogKeySerde {
public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
throw new KafkaException("exception");
}
}
}