blob: 9c5dad5f549485cf3ef99c0a503bb6bcd757fb44 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.processor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import kafka.admin.AdminUtils;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.test.StandaloneIntegrationTestHarness;
import org.apache.samza.test.StandaloneTestUtils;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkKeyBuilder;
import org.apache.samza.zk.ZkUtils;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for {@link LocalApplicationRunner}.
*
* Brings up embedded ZooKeeper, Kafka broker and launches multiple {@link StreamApplication} through
* {@link LocalApplicationRunner} to verify the guarantees made in stand alone execution environment.
*/
public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarness {
private static final Logger LOGGER = LoggerFactory.getLogger(TestZkLocalApplicationRunner.class);
private static final int NUM_KAFKA_EVENTS = 300;
private static final int ZK_CONNECTION_TIMEOUT_MS = 100;
private static final String TEST_SYSTEM = "TestSystemName";
private static final String TEST_SSP_GROUPER_FACTORY = "org.apache.samza.container.grouper.stream.GroupByPartitionFactory";
private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
private static final String TASK_SHUTDOWN_MS = "3000";
private static final String JOB_DEBOUNCE_TIME_MS = "1000";
private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
private String inputKafkaTopic;
private String outputKafkaTopic;
private String inputSinglePartitionKafkaTopic;
private String outputSinglePartitionKafkaTopic;
private ZkUtils zkUtils;
private ApplicationConfig applicationConfig1;
private ApplicationConfig applicationConfig2;
private ApplicationConfig applicationConfig3;
private LocalApplicationRunner applicationRunner1;
private LocalApplicationRunner applicationRunner2;
private LocalApplicationRunner applicationRunner3;
private String testStreamAppName;
private String testStreamAppId;
@Rule
public Timeout testTimeOutInMillis = new Timeout(120000);
@Rule
public final ExpectedException expectedException = ExpectedException.none();
// @Override
public void setUp() {
super.setUp();
String uniqueTestId = UUID.randomUUID().toString();
testStreamAppName = String.format("test-app-name-%s", uniqueTestId);
testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId);
outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
inputSinglePartitionKafkaTopic = String.format("test-input-single-partition-topic-%s", uniqueTestId);
outputSinglePartitionKafkaTopic = String.format("test-output-single-partition-topic-%s", uniqueTestId);
// Set up stream application config map with the given testStreamAppName, testStreamAppId and test kafka system
// TODO: processorId should typically come up from a processorID generator as processor.id will be deprecated in 0.14.0+
Map<String, String> configMap =
buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
ZkClient zkClient = new ZkClient(zkConnect());
ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(ZkJobCoordinatorFactory.getJobCoordinationZkPath(applicationConfig1));
zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
zkUtils.connect();
// Create local application runners.
applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
applicationRunner3 = new LocalApplicationRunner(applicationConfig3);
for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
TestUtils.createTopic(zkUtils(), kafkaTopic, 5, 1, servers(), new Properties());
}
for (String kafkaTopic : ImmutableList.of(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic)) {
LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
TestUtils.createTopic(zkUtils(), kafkaTopic, 1, 1, servers(), new Properties());
}
}
// @Override
public void tearDown() {
if (zookeeper().zookeeper().isRunning()) {
for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
}
for (String kafkaTopic : ImmutableList.of(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic)) {
LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
}
zkUtils.close();
super.tearDown();
}
}
private void publishKafkaEvents(String topic, int numEvents, String streamProcessorId) {
KafkaProducer producer = getKafkaProducer();
for (int eventIndex = 0; eventIndex < numEvents; eventIndex++) {
try {
LOGGER.info("Publish kafka event with index : {} for stream processor: {}.", eventIndex, streamProcessorId);
producer.send(new ProducerRecord(topic, new TestKafkaEvent(streamProcessorId, String.valueOf(eventIndex)).toString().getBytes()));
} catch (Exception e) {
LOGGER.error("Publishing to kafka topic: {} resulted in exception: {}.", new Object[]{topic, e});
throw new SamzaException(e);
}
}
}
private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
String appName, String appId) {
Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
.put(TaskConfig.INPUT_STREAMS(), inputTopic)
.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
.put(TaskConfig.IGNORED_EXCEPTIONS(), "*")
.put(ZkConfig.ZK_CONNECT, zkConnect())
.put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY)
.put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY)
.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY)
.put(ApplicationConfig.APP_NAME, appName)
.put(ApplicationConfig.APP_ID, appId)
.put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
.put(JobConfig.JOB_NAME(), appName)
.put(JobConfig.JOB_ID(), appId)
.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
.build();
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
return applicationConfig;
}
/**
* sspGrouper is set to GroupBySystemStreamPartitionFactory.
* Run a stream application(streamApp1) consuming messages from input topic(effectively one container).
*
* In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2).
*
* Assertions:
* A) JobModel generated before and after the addition of streamApp2 should be equal.
* B) Second stream application(streamApp2) should not join the group and process any message.
*/
//@Test
public void shouldStopNewProcessorsJoiningGroupWhenNumContainersIsGreaterThanNumTasks() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputSinglePartitionKafkaTopic, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]);
// Configuration, verification variables
MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(),
"org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
// Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block).
final JobModel[] previousJobModel = new JobModel[1];
final String[] previousJobModelVersion = new String[1];
AtomicBoolean hasSecondProcessorJoined = new AtomicBoolean(false);
final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
// When streamApp2 with id: PROCESSOR_IDS[1] is registered, start processing message in streamApp1.
if (currentChilds.contains(PROCESSOR_IDS[1])) {
secondProcessorRegistered.countDown();
}
});
// Set up stream app 2.
CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
StreamApplication streamApp2 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
processedMessagesLatch, null, null);
// Callback handler for streamApp1.
StreamApplicationCallback streamApplicationCallback = message -> {
if (hasSecondProcessorJoined.compareAndSet(false, true)) {
previousJobModelVersion[0] = zkUtils.getJobModelVersion();
previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
localApplicationRunner2.run(streamApp2);
try {
// Wait for streamApp2 to register with zookeeper.
secondProcessorRegistered.await();
} catch (InterruptedException e) {
}
}
};
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
// Set up stream app 1.
LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
StreamApplication streamApp1 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
null, streamApplicationCallback, kafkaEventsConsumedLatch);
localApplicationRunner1.run(streamApp1);
kafkaEventsConsumedLatch.await();
String currentJobModelVersion = zkUtils.getJobModelVersion();
JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
// JobModelVersion check to verify that leader publishes new jobModel.
assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion));
// Job model before and after the addition of second stream processor should be the same.
assertEquals(previousJobModel[0], updatedJobModel);
// TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
// ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
}
/**
* sspGrouper is set to AllSspToSingleTaskGrouperFactory (All ssps from input kafka topic are mapped to a single task per container).
* AllSspToSingleTaskGrouperFactory should be used only with high-level consumers which do the partition management
* by themselves. Using the factory with the consumers that do not do the partition management will result in
* each processor/task consuming all the messages from all the partitions.
* Run a stream application(streamApp1) consuming messages from input topic(effectively one container).
*
* In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2).
*
* Assertions:
* A) JobModel generated before and after the addition of streamApp2 should not be equal.
* B) Second stream application(streamApp2) should join the group and process all the messages.
*/
//@Test
public void shouldUpdateJobModelWhenNewProcessorJoiningGroupUsingAllSspToSingleTaskGrouperFactory() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]);
// Configuration, verification variables
MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
// Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block).
final JobModel[] previousJobModel = new JobModel[1];
final String[] previousJobModelVersion = new String[1];
AtomicBoolean hasSecondProcessorJoined = new AtomicBoolean(false);
final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
// When streamApp2 with id: PROCESSOR_IDS[1] is registered, start processing message in streamApp1.
if (currentChilds.contains(PROCESSOR_IDS[1])) {
secondProcessorRegistered.countDown();
}
});
// Set up streamApp2.
CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null);
// Callback handler for streamApp1.
StreamApplicationCallback streamApplicationCallback = message -> {
if (hasSecondProcessorJoined.compareAndSet(false, true)) {
previousJobModelVersion[0] = zkUtils.getJobModelVersion();
previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
localApplicationRunner2.run(streamApp2);
try {
// Wait for streamApp2 to register with zookeeper.
secondProcessorRegistered.await();
} catch (InterruptedException e) {
}
}
};
// This is the latch for the messages received by streamApp1. Since streamApp1 is run first, it gets one event
// redelivered due to re-balancing done by Zk after the streamApp2 joins (See the callback above).
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2 + 1);
// Set up stream app 1.
LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null,
streamApplicationCallback, kafkaEventsConsumedLatch);
localApplicationRunner1.run(streamApp1);
kafkaEventsConsumedLatch.await();
String currentJobModelVersion = zkUtils.getJobModelVersion();
JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
// JobModelVersion check to verify that leader publishes new jobModel.
assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion));
// Job model before and after the addition of second stream processor should not be the same.
assertTrue(!previousJobModel[0].equals(updatedJobModel));
// Task names in the job model should be different but the set of partitions should be the same and each task name
// should be assigned to a different container.
assertEquals(previousJobModel[0].getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1);
assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1);
assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks().size(), 1);
Map<TaskName, TaskModel> updatedTaskModelMap1 = updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks();
Map<TaskName, TaskModel> updatedTaskModelMap2 = updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks();
assertEquals(updatedTaskModelMap1.size(), 1);
assertEquals(updatedTaskModelMap2.size(), 1);
TaskModel taskModel1 = updatedTaskModelMap1.values().stream().findFirst().get();
TaskModel taskModel2 = updatedTaskModelMap2.values().stream().findFirst().get();
assertEquals(taskModel1.getSystemStreamPartitions(), taskModel2.getSystemStreamPartitions());
assertTrue(!taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName()));
// TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
processedMessagesLatch.await();
}
//@Test
public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
// Create stream applications.
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(2 * NUM_KAFKA_EVENTS);
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch);
// Run stream applications.
applicationRunner1.run(streamApp1);
applicationRunner2.run(streamApp2);
applicationRunner3.run(streamApp3);
// Wait until all processors have processed a message.
processedMessagesLatch1.await();
processedMessagesLatch2.await();
processedMessagesLatch3.await();
// Verifications before killing the leader.
String jobModelVersion = zkUtils.getJobModelVersion();
JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
assertEquals(3, jobModel.getContainers().size());
assertEquals(Sets.newHashSet("0000000000", "0000000001", "0000000002"), jobModel.getContainers().keySet());
assertEquals("1", jobModelVersion);
List<String> processorIdsFromZK = zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS));
assertEquals(3, processorIdsFromZK.size());
assertEquals(PROCESSOR_IDS[0], processorIdsFromZK.get(0));
// Kill the leader. Since streamApp1 is the first to join the cluster, it's the leader.
applicationRunner1.kill(streamApp1);
applicationRunner1.waitForFinish();
kafkaEventsConsumedLatch.await();
// Verifications after killing the leader.
assertEquals(ApplicationStatus.SuccessfulFinish, applicationRunner1.status(streamApp1));
processorIdsFromZK = zkUtils.getActiveProcessorsIDs(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2]));
assertEquals(2, processorIdsFromZK.size());
assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0));
jobModelVersion = zkUtils.getJobModelVersion();
assertEquals("2", jobModelVersion);
jobModel = zkUtils.getJobModel(jobModelVersion);
assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
assertEquals(2, jobModel.getContainers().size());
}
//@Test
public void shouldFailWhenNewProcessorJoinsWithSameIdAsExistingProcessor() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
// Create StreamApplications.
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
// Run stream applications.
applicationRunner1.run(streamApp1);
applicationRunner2.run(streamApp2);
// Wait for message processing to start in both the processors.
processedMessagesLatch1.await();
processedMessagesLatch2.await();
LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(new MapConfig(applicationConfig2));
// Create a stream app with same processor id as SP2 and run it. It should fail.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch);
// Fail when the duplicate processor joins.
expectedException.expect(SamzaException.class);
applicationRunner3.run(streamApp3);
}
//@Test
public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() throws Exception {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
/**
* Custom listeners can't be plugged in for transition events(generatingNewJobModel, waitingForProcessors, waitingForBarrierCompletion etc) from zkJobCoordinator. Only possible listeners
* are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry.
*/
Map<String, String> debounceTimeConfig = ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
configMap.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
Config applicationConfig1 = new MapConfig(configMap);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
Config applicationConfig2 = new MapConfig(configMap);
LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
List<TestKafkaEvent> messagesProcessed = new ArrayList<>();
StreamApplicationCallback streamApplicationCallback = messagesProcessed::add;
// Create StreamApplication from configuration.
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch);
StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
// Run stream application.
applicationRunner1.run(streamApp1);
applicationRunner2.run(streamApp2);
processedMessagesLatch1.await();
processedMessagesLatch2.await();
// Read job model before rolling upgrade.
String jobModelVersion = zkUtils.getJobModelVersion();
JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
applicationRunner1.kill(streamApp1);
applicationRunner1.waitForFinish();
int lastProcessedMessageId = -1;
for (TestKafkaEvent message : messagesProcessed) {
lastProcessedMessageId = Math.max(lastProcessedMessageId, Integer.parseInt(message.getEventData()));
}
messagesProcessed.clear();
LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1);
processedMessagesLatch1 = new CountDownLatch(1);
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch);
applicationRunner4.run(streamApp1);
processedMessagesLatch1.await();
// Read new job model after rolling upgrade.
String newJobModelVersion = zkUtils.getJobModelVersion();
JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
// This should be continuation of last processed message.
int nextSeenMessageId = Integer.parseInt(messagesProcessed.get(0).getEventData());
assertTrue(lastProcessedMessageId <= nextSeenMessageId);
assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion));
assertEquals(jobModel.getContainers(), newJobModel.getContainers());
}
//@Test
public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG), "1000"));
MapConfig applicationRunnerConfig1 = new MapConfig(ImmutableList.of(applicationConfig1, kafkaProducerConfig));
MapConfig applicationRunnerConfig2 = new MapConfig(ImmutableList.of(applicationConfig2, kafkaProducerConfig));
LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationRunnerConfig1);
LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationRunnerConfig2);
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
// Create StreamApplications.
StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, null);
StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, null);
// Run stream applications.
applicationRunner1.run(streamApp1);
applicationRunner2.run(streamApp2);
processedMessagesLatch1.await();
processedMessagesLatch2.await();
// Non daemon thread in brokers reconnect repeatedly to zookeeper on failures. Manually shutting them down.
List<KafkaServer> kafkaServers = JavaConverters.bufferAsJavaListConverter(this.servers()).asJava();
kafkaServers.forEach(KafkaServer::shutdown);
zookeeper().shutdown();
applicationRunner1.waitForFinish();
applicationRunner2.waitForFinish();
assertEquals(ApplicationStatus.UnsuccessfulFinish, applicationRunner1.status(streamApp1));
assertEquals(ApplicationStatus.UnsuccessfulFinish, applicationRunner2.status(streamApp2));
}
public interface StreamApplicationCallback {
void onMessageReceived(TestKafkaEvent message);
}
private static class TestKafkaEvent implements Serializable {
// Actual content of the event.
private String eventData;
// Contains Integer value, which is greater than previous message id.
private String eventId;
TestKafkaEvent(String eventId, String eventData) {
this.eventData = eventData;
this.eventId = eventId;
}
String getEventId() {
return eventId;
}
String getEventData() {
return eventData;
}
@Override
public String toString() {
return eventId + "|" + eventData;
}
static TestKafkaEvent fromString(String message) {
String[] messageComponents = message.split("\\|");
return new TestKafkaEvent(messageComponents[0], messageComponents[1]);
}
}
/**
* Publishes all input events to output topic(has no processing logic)
* and triggers {@link StreamApplicationCallback} with each received event.
**/
private static class TestStreamApplication implements StreamApplication {
private final String inputTopic;
private final String outputTopic;
private final CountDownLatch processedMessagesLatch;
private final StreamApplicationCallback streamApplicationCallback;
private final CountDownLatch kafkaEventsConsumedLatch;
TestStreamApplication(String inputTopic, String outputTopic,
CountDownLatch processedMessagesLatch,
StreamApplicationCallback streamApplicationCallback, CountDownLatch kafkaEventsConsumedLatch) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.processedMessagesLatch = processedMessagesLatch;
this.streamApplicationCallback = streamApplicationCallback;
this.kafkaEventsConsumedLatch = kafkaEventsConsumedLatch;
}
@Override
public void init(StreamGraph graph, Config config) {
MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>());
OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde());
inputStream
.map(msg -> {
TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg);
if (streamApplicationCallback != null) {
streamApplicationCallback.onMessageReceived(incomingMessage);
}
if (processedMessagesLatch != null) {
processedMessagesLatch.countDown();
}
if (kafkaEventsConsumedLatch != null) {
kafkaEventsConsumedLatch.countDown();
}
return incomingMessage.toString();
})
.sendTo(outputStream);
}
}
}