blob: 5c2855315af0c9c8590868bed6d448b239ae735b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.processor;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Test;
/**
* Happy path tests.
* Start 1, 2, 5 processors and make sure they all consume all the events.
*/
public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
@Override
protected String prefix() {
return "test_ZK_";
}
@Test
public void testSingleStreamProcessor() {
testStreamProcessor(new String[]{"1"});
}
@Test
public void testTwoStreamProcessors() {
testStreamProcessor(new String[]{"2", "3"});
}
@Test
public void testFiveStreamProcessors() {
testStreamProcessor(new String[]{"4", "5", "6", "7", "8"});
}
// main test method for happy path with fixed number of processors
private void testStreamProcessor(String[] processorIds) {
// create a latch of the size equals to the number of messages
TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(messageCount);
// initialize the processors
StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length];
// we need to know when the processor has started
CountDownLatch[] startWait = new CountDownLatch[processorIds.length];
for (int i = 0; i < processorIds.length; i++) {
startWait[i] = new CountDownLatch(1);
streamProcessors[i] = createStreamProcessor(processorIds[i], map, startWait[i], null);
}
// run the processors in separate threads
Thread[] threads = new Thread[processorIds.length];
CountDownLatch[] stopLatches = new CountDownLatch[processorIds.length];
for (int i = 0; i < processorIds.length; i++) {
stopLatches[i] = new CountDownLatch(1);
threads[i] = runInThread(streamProcessors[i], stopLatches[i]);
threads[i].start();
}
// wait until the processor reports that it has started
for (int i = 0; i < processorIds.length; i++) {
waitForProcessorToStartStop(startWait[i]);
}
// produce messageCount messages, starting with key 0
produceMessages(0, inputTopic, messageCount);
// wait until all the events are consumed
waitUntilMessagesLeftN(0);
// collect all the threads
try {
for (int i = 0; i < threads.length; i++) {
stopProcessor(stopLatches[i]);
threads[i].join(1000);
}
} catch (InterruptedException e) {
Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
}
verifyNumMessages(outputTopic, messageCount, messageCount);
}
@Test
/**
* Similar to the previous tests, but add another processor in the middle
*/ public void testStreamProcessorWithAdd() {
// set number of events we expect to read by both processes in total:
// p1 - reads 'messageCount' at first
// p1 and p2 read all messageCount together, since they start from the beginning.
// so we expect total 3 x messageCounts
int totalEventsToGenerate = 3 * messageCount;
TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
// create first processor
CountDownLatch startWait1 = new CountDownLatch(1);
CountDownLatch stopWait1 = new CountDownLatch(1);
StreamProcessor sp1 = createStreamProcessor("20", map, startWait1, stopWait1);
// produce first batch of messages starting with 0
produceMessages(0, inputTopic, messageCount);
// start the first processor
CountDownLatch stopLatch1 = new CountDownLatch(1);
Thread t1 = runInThread(sp1, stopLatch1);
t1.start();
// wait until the processor reports that it has started
waitForProcessorToStartStop(startWait1);
// make sure it consumes all the messages from the first batch
waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
CountDownLatch containerStopped1 = sp1.containerShutdownLatch;
// start the second processor
CountDownLatch startWait2 = new CountDownLatch(1);
StreamProcessor sp2 = createStreamProcessor("21", map, startWait2, null);
CountDownLatch stopLatch2 = new CountDownLatch(1);
Thread t2 = runInThread(sp2, stopLatch2);
t2.start();
// wait until 2nd processor reports that it has started
waitForProcessorToStartStop(startWait2);
// wait until the 1st processor reports that it has stopped its container
LOG.info("containerStopped latch = " + containerStopped1);
waitForProcessorToStartStop(containerStopped1);
// read again the first batch
waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount);
// produce the second batch of the messages, starting with 'messageCount'
produceMessages(messageCount, inputTopic, messageCount);
// wait until all the events are consumed
waitUntilMessagesLeftN(0);
// shutdown both
try {
stopProcessor(stopLatch1);
stopProcessor(stopLatch2);
t1.join(1000);
t2.join(1000);
} catch (InterruptedException e) {
Assert.fail("Failed to join finished threads:" + e.getLocalizedMessage());
}
// p1 will read messageCount events, and then p1 and p2 will read 2xmessageCount events together,
// but the expected values are the same 0-79, they will appear in the output more then once, but we should mark then only one time.
// total number of events we gonna get is 80+40=120
verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
}
@Test
/**
* same as other happy path messages, but with one processor removed in the middle
*/ public void testStreamProcessorWithRemove() {
// set number of events we expect to read by both processes in total:
// p1 and p2 - both read messageCount at first and p1 is shutdown, new batch of events is generated
// and p2 will read all of them from the beginning (+ 2 x messageCounts, total 3 x)
int totalEventsToGenerate = 3 * messageCount;
TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
// create first processor
CountDownLatch waitStart1 = new CountDownLatch(1);
CountDownLatch waitStop1 = new CountDownLatch(1);
StreamProcessor sp1 = createStreamProcessor("30", map, waitStart1, waitStop1);
// start the first processor
CountDownLatch stopLatch1 = new CountDownLatch(1);
Thread t1 = runInThread(sp1, stopLatch1);
t1.start();
// start the second processor
CountDownLatch waitStart2 = new CountDownLatch(1);
CountDownLatch waitStop2 = new CountDownLatch(1);
StreamProcessor sp2 = createStreamProcessor("31", map, waitStart2, waitStop2);
CountDownLatch stopLatch2 = new CountDownLatch(1);
Thread t2 = runInThread(sp2, stopLatch2);
t2.start();
// wait until the processor reports that it has started
waitForProcessorToStartStop(waitStart1);
// wait until the processor reports that it has started
waitForProcessorToStartStop(waitStart2);
// produce first batch of messages starting with 0
produceMessages(0, inputTopic, messageCount);
// make sure they consume all the messages from the first batch
waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
CountDownLatch containerStopped2 = sp2.containerShutdownLatch;
// stop the first processor
stopProcessor(stopLatch1);
// wait until it's really down
waitForProcessorToStartStop(waitStop1);
// processor2 will stop it container and start again.
// We wait for the container's stop to make sure we can count EXACTLY how many messages it reads.
LOG.info("containerStopped latch = " + containerStopped2);
waitForProcessorToStartStop(containerStopped2);
// read again the first batch
waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount);
// produce the second batch of the messages, starting with 'messageCount'
produceMessages(messageCount, inputTopic, messageCount);
// wait until p2 consumes all the message by itself;
waitUntilMessagesLeftN(0);
// shutdown p2
try {
stopProcessor(stopLatch2);
t2.join(1000);
} catch (InterruptedException e) {
Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
}
// processor1 and 2 will both read 20 events (total 40), and then processor2 read 80 events by itself,
// but the expected values are the same 0-79 - we should get each value one time.
// Meanwhile the number of events we gonna get is 40 + 80
verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
}
}