blob: 9b2614f6c37e956a7e2ec183dbc92f4d8abb4da8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class GetKafkaIntegrationTests {
private static EmbeddedKafka kafkaLocal;
private static EmbeddedKafkaProducerHelper producerHelper;
@BeforeClass
public static void bforeClass(){
kafkaLocal = new EmbeddedKafka();
kafkaLocal.start();
producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
}
@AfterClass
public static void afterClass() throws Exception {
producerHelper.close();
kafkaLocal.stop();
}
/**
* Will set auto-offset to 'smallest' to ensure that all events (the once
* that were sent before and after consumer startup) are received.
*/
@Test
public void testGetAllMessages() throws Exception {
String topicName = "testGetAllMessages";
GetKafka getKafka = new GetKafka();
final TestRunner runner = TestRunners.newTestRunner(getKafka);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:" + kafkaLocal.getZookeeperPort());
runner.setProperty(GetKafka.TOPIC, topicName);
runner.setProperty(GetKafka.BATCH_SIZE, "5");
runner.setProperty(GetKafka.AUTO_OFFSET_RESET, GetKafka.SMALLEST);
runner.setProperty("consumer.timeout.ms", "300");
producerHelper.sendEvent(topicName, "Hello-1");
producerHelper.sendEvent(topicName, "Hello-2");
producerHelper.sendEvent(topicName, "Hello-3");
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
try {
runner.run(20, false);
} finally {
latch.countDown();
}
}
}).start();
// Thread.sleep(1000);
producerHelper.sendEvent(topicName, "Hello-4");
producerHelper.sendEvent(topicName, "Hello-5");
producerHelper.sendEvent(topicName, "Hello-6");
latch.await();
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
// must be two since we sent 6 messages with batch of 5
assertEquals(2, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
String[] events = new String(flowFile.toByteArray()).split("\\s+");
assertEquals(5, events.length);
// spot check
assertEquals("Hello-1", events[0]);
assertEquals("Hello-4", events[3]);
flowFile = flowFiles.get(1);
events = new String(flowFile.toByteArray()).split("\\s+");
assertEquals(1, events.length);
getKafka.shutdownConsumer();
}
/**
* Based on auto-offset set to 'largest' events sent before consumer start
* should not be consumed.
*
*/
@Test
public void testGetOnlyMessagesAfterConsumerStartup() throws Exception {
String topicName = "testGetOnlyMessagesAfterConsumerStartup";
GetKafka getKafka = new GetKafka();
final TestRunner runner = TestRunners.newTestRunner(getKafka);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:" + kafkaLocal.getZookeeperPort());
runner.setProperty(GetKafka.TOPIC, topicName);
runner.setProperty(GetKafka.BATCH_SIZE, "5");
runner.setProperty("consumer.timeout.ms", "300");
producerHelper.sendEvent(topicName, "Hello-1");
producerHelper.sendEvent(topicName, "Hello-2");
producerHelper.sendEvent(topicName, "Hello-3");
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
try {
runner.run(20, false);
} finally {
latch.countDown();
}
}
}).start();
latch.await();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
assertEquals(0, flowFiles.size());
producerHelper.sendEvent(topicName, "Hello-4");
producerHelper.sendEvent(topicName, "Hello-5");
producerHelper.sendEvent(topicName, "Hello-6");
latch.await();
runner.run(5, false);
flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
// must be single since we should only be receiving 4,5 and 6 in batch
// of 5
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
String[] events = new String(flowFile.toByteArray()).split("\\s+");
assertEquals(3, events.length);
assertEquals("Hello-4", events[0]);
assertEquals("Hello-5", events[1]);
assertEquals("Hello-6", events[2]);
getKafka.shutdownConsumer();
}
}