blob: 95aafa4464de404895de358d77ad5246d825ecce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.kafka;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableMap;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* 1) Run app and consume messages
* 2) Commit only for first message
* 3) Shutdown application
* 4) Run app a second time to use the checkpoint
* 5) Verify that we had to re-process the message after the first message
*/
public class KafkaCheckpointManagerIntegrationTest extends StreamApplicationIntegrationTestHarness {
private static final String SYSTEM = "kafka";
private static final String INPUT_STREAM = "inputStream";
private static final Map<String, String> CONFIGS = ImmutableMap.of(
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory",
JobConfig.PROCESSOR_ID, "0",
TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1",
TaskConfig.COMMIT_MS, "-1"); // manual commit only
/**
* Keep track of which messages have been received by the application.
*/
private static final Map<String, AtomicInteger> PROCESSED = new HashMap<>();
/**
* If message has this prefix, then request a commit after processing it.
*/
private static final String COMMIT_PREFIX = "commit";
/**
* If message equals this string, then shut down the task if the task is configured to handle intermediate shutdown.
*/
private static final String INTERMEDIATE_SHUTDOWN = "intermediateShutdown";
/**
* If message equals this string, then shut down the task.
*/
private static final String END_OF_STREAM = "endOfStream";
@Before
public void setup() {
PROCESSED.clear();
}
@Test
public void testCheckpoint() {
createTopic(INPUT_STREAM, 2);
produceMessages(0);
produceMessages(1);
// run application once and verify processed messages before shutdown
runApplication(new CheckpointApplication(true), "CheckpointApplication", CONFIGS).getRunner().waitForFinish();
verifyProcessedMessagesFirstRun();
// run application a second time and verify that certain messages had to be re-processed
runApplication(new CheckpointApplication(false), "CheckpointApplication", CONFIGS).getRunner().waitForFinish();
verifyProcessedMessagesSecondRun();
}
private void produceMessages(int partitionId) {
String key = "key" + partitionId;
// commit first message
produceMessage(INPUT_STREAM, partitionId, key, commitMessage(partitionId, 0));
// don't commit second message
produceMessage(INPUT_STREAM, partitionId, key, noCommitMessage(partitionId, 1));
// do an initial shutdown so that the test can check that the second message gets re-processed
produceMessage(INPUT_STREAM, partitionId, key, INTERMEDIATE_SHUTDOWN);
// do a commit on the third message
produceMessage(INPUT_STREAM, partitionId, key, commitMessage(partitionId, 2));
// this will make the task shut down for the second run
produceMessage(INPUT_STREAM, partitionId, key, END_OF_STREAM);
}
/**
* Each partition should have seen two messages before shutting down.
*/
private static void verifyProcessedMessagesFirstRun() {
assertEquals(4, PROCESSED.size());
assertEquals(1, PROCESSED.get(commitMessage(0, 0)).get());
assertEquals(1, PROCESSED.get(noCommitMessage(0, 1)).get());
assertEquals(1, PROCESSED.get(commitMessage(1, 0)).get());
assertEquals(1, PROCESSED.get(noCommitMessage(1, 1)).get());
}
/**
* For each partition: re-process the second message (for 2 total of the second message), receive the third message.
*/
private static void verifyProcessedMessagesSecondRun() {
assertEquals(6, PROCESSED.size());
assertEquals(1, PROCESSED.get(commitMessage(0, 0)).get());
assertEquals(2, PROCESSED.get(noCommitMessage(0, 1)).get());
assertEquals(1, PROCESSED.get(commitMessage(0, 2)).get());
assertEquals(1, PROCESSED.get(commitMessage(1, 0)).get());
assertEquals(2, PROCESSED.get(noCommitMessage(1, 1)).get());
assertEquals(1, PROCESSED.get(commitMessage(1, 2)).get());
}
private static String commitMessage(int partitionId, int messageId) {
return String.join("_", COMMIT_PREFIX, "partition", Integer.toString(partitionId), Integer.toString(messageId));
}
private static String noCommitMessage(int partitionId, int messageId) {
return String.join("_", "partition", Integer.toString(partitionId), Integer.toString(messageId));
}
private static class CheckpointApplication implements TaskApplication {
private final boolean handleIntermediateShutdown;
private CheckpointApplication(boolean handleIntermediateShutdown) {
this.handleIntermediateShutdown = handleIntermediateShutdown;
}
@Override
public void describe(TaskApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor sd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<String> isd = sd.getInputDescriptor(INPUT_STREAM, new StringSerde());
appDescriptor.withInputStream(isd)
.withTaskFactory((StreamTaskFactory) () -> new CheckpointTask(this.handleIntermediateShutdown));
}
}
private static class CheckpointTask implements StreamTask {
/**
* Determine if task should respond to {@link #INTERMEDIATE_SHUTDOWN}.
* Helps with testing that any uncommitted messages get reprocessed if the job starts again.
*/
private final boolean handleIntermediateShutdown;
/**
* When requesting shutdown, there is no guarantee of an immediate shutdown, since there are multiple tasks in the
* container. Use this flag to make sure we don't process more messages past the shutdown request in order to have
* deterministic counting of the messages for the test.
*/
private boolean stopProcessing = false;
private CheckpointTask(boolean handleIntermediateShutdown) {
this.handleIntermediateShutdown = handleIntermediateShutdown;
}
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
if (!this.stopProcessing) {
String value = (String) envelope.getMessage();
if (INTERMEDIATE_SHUTDOWN.equals(value)) {
if (this.handleIntermediateShutdown) {
setShutdown(coordinator);
}
} else if (END_OF_STREAM.equals(value)) {
setShutdown(coordinator);
} else {
synchronized (this) {
PROCESSED.putIfAbsent(value, new AtomicInteger(0));
PROCESSED.get(value).incrementAndGet();
}
if (value.startsWith(COMMIT_PREFIX)) {
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
}
}
}
}
private void setShutdown(TaskCoordinator coordinator) {
this.stopProcessing = true;
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
}
}