| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.test.processor; |
| |
| import java.io.File; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import kafka.utils.TestUtils; |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.clients.producer.KafkaProducer; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.kafka.common.security.auth.SecurityProtocol; |
| import org.apache.kafka.common.serialization.ByteArrayDeserializer; |
| import org.apache.kafka.common.serialization.ByteArraySerializer; |
| import org.apache.kafka.common.serialization.StringDeserializer; |
| import org.apache.kafka.common.serialization.StringSerializer; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.config.ZkConfig; |
| import org.apache.samza.processor.StreamProcessor; |
| import org.apache.samza.runtime.ProcessorLifecycleListener; |
| import org.apache.samza.task.AsyncStreamTaskAdapter; |
| import org.apache.samza.task.AsyncStreamTaskFactory; |
| import org.apache.samza.task.StreamTaskFactory; |
| import org.apache.samza.test.StandaloneTestUtils; |
| import org.apache.samza.test.harness.IntegrationTestHarness; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import scala.Option$; |
| |
| import static org.mockito.Mockito.any; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doNothing; |
| import static org.mockito.Mockito.mock; |
| |
| |
| public class TestStreamProcessor extends IntegrationTestHarness { |
| |
| public static final String PROCESSOR_ID = "1"; |
| |
| /** |
| * Testing a basic identity stream task - reads data from a topic and writes it to another topic |
| * (without any modifications) |
| * |
| * <p> |
| * The standalone version in this test uses KafkaSystemFactory and it uses a SingleContainerGrouperFactory. Hence, |
| * no matter how many tasks are present, it will always be run in a single processor instance. This simplifies testing |
| */ |
| @Test |
| public void testStreamProcessor() { |
| final String testSystem = "test-system"; |
| final String inputTopic = "numbers"; |
| final String outputTopic = "output"; |
| final int messageCount = 20; |
| |
| final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount)); |
| // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a |
| // TopicExistsException since StreamProcessor auto-creates them. |
| createTopics(inputTopic, outputTopic); |
| final TestStubs stubs = new TestStubs(configs, IdentityStreamTask::new, bootstrapServers()); |
| |
| produceMessages(stubs.producer, inputTopic, messageCount); |
| run(stubs.processor, stubs.shutdownLatch); |
| verifyNumMessages(stubs.consumer, outputTopic, messageCount); |
| } |
| |
| /** |
| * Should be able to create task instances from the provided task factory. |
| */ |
| @Test |
| public void testStreamProcessorWithStreamTaskFactory() { |
| final String testSystem = "test-system"; |
| final String inputTopic = "numbers2"; |
| final String outputTopic = "output2"; |
| final int messageCount = 20; |
| |
| final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount)); |
| createTopics(inputTopic, outputTopic); |
| final TestStubs stubs = new TestStubs(configs, IdentityStreamTask::new, bootstrapServers()); |
| |
| produceMessages(stubs.producer, inputTopic, messageCount); |
| run(stubs.processor, stubs.shutdownLatch); |
| verifyNumMessages(stubs.consumer, outputTopic, messageCount); |
| } |
| |
| /** |
| * Should be able to create task instances from the provided task factory. |
| */ |
| @Test |
| public void testStreamProcessorWithAsyncStreamTaskFactory() { |
| final String testSystem = "test-system"; |
| final String inputTopic = "numbers3"; |
| final String outputTopic = "output3"; |
| final int messageCount = 20; |
| |
| final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount)); |
| final ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| createTopics(inputTopic, outputTopic); |
| final AsyncStreamTaskFactory stf = () -> new AsyncStreamTaskAdapter(new IdentityStreamTask(), executorService); |
| final TestStubs stubs = new TestStubs(configs, stf, bootstrapServers()); |
| |
| produceMessages(stubs.producer, inputTopic, messageCount); |
| run(stubs.processor, stubs.shutdownLatch); |
| verifyNumMessages(stubs.consumer, outputTopic, messageCount); |
| executorService.shutdownNow(); |
| } |
| |
| /** |
| * Should fail to create a SamzaContainer when neither task factory nor task.class are provided. |
| */ |
| @Test(expected = SamzaException.class) |
| public void testStreamProcessorWithNoTask() { |
| final String testSystem = "test-system"; |
| final String inputTopic = "numbers4"; |
| final String outputTopic = "output4"; |
| final int messageCount = 20; |
| |
| final Map<String, String> configMap = createConfigs(PROCESSOR_ID, testSystem, inputTopic, outputTopic, messageCount); |
| configMap.remove("task.class"); |
| final Config configs = new MapConfig(configMap); |
| final TestStubs stubs = new TestStubs(configs, (StreamTaskFactory) null, bootstrapServers()); |
| |
| run(stubs.processor, stubs.shutdownLatch); |
| } |
| |
| private void createTopics(String inputTopic, String outputTopic) { |
| TestUtils.createTopic(kafkaZkClient(), inputTopic, 1, 1, servers(), new Properties()); |
| TestUtils.createTopic(kafkaZkClient(), outputTopic, 1, 1, servers(), new Properties()); |
| } |
| |
| private Map<String, String> createConfigs(String processorId, String testSystem, String inputTopic, String outputTopic, int messageCount) { |
| Map<String, String> configs = new HashMap<>(); |
| configs.putAll( |
| StandaloneTestUtils.getStandaloneConfigs("test-job", "org.apache.samza.test.processor.IdentityStreamTask")); |
| configs.putAll(StandaloneTestUtils.getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, |
| StandaloneTestUtils.SerdeAlias.STRING, true)); |
| configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic)); |
| configs.put("app.messageCount", String.valueOf(messageCount)); |
| configs.put("app.outputTopic", outputTopic); |
| configs.put("app.outputSystem", testSystem); |
| configs.put(ZkConfig.ZK_CONNECT, zkConnect()); |
| configs.put("processor.id", processorId); |
| return configs; |
| } |
| |
| /** |
| * Produces the provided number of messages to the topic. |
| */ |
| @SuppressWarnings("unchecked") |
| private void produceMessages(KafkaProducer producer, String topic, int numMessages) { |
| for (int i = 0; i < numMessages; i++) { |
| try { |
| producer.send(new ProducerRecord(topic, String.valueOf(i).getBytes())).get(); |
| } catch (InterruptedException | ExecutionException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| |
| /** |
| * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout, |
| * and then stopping it. |
| */ |
| private void run(StreamProcessor processor, CountDownLatch latch) { |
| boolean latchResult = false; |
| processor.start(); |
| try { |
| latchResult = latch.await(10, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } finally { |
| if (!latchResult) { |
| Assert.fail("StreamTask either failed to process all input or timed out!"); |
| } |
| } |
| processor.stop(); |
| } |
| |
| /** |
| * Consumes data from the topic until there are no new messages for a while |
| * and asserts that the number of consumed messages is as expected. |
| */ |
| @SuppressWarnings("unchecked") |
| private void verifyNumMessages(KafkaConsumer consumer, String topic, int expectedNumMessages) { |
| consumer.subscribe(Collections.singletonList(topic)); |
| |
| int count = 0; |
| int emptyPollCount = 0; |
| |
| while (count < expectedNumMessages && emptyPollCount < 5) { |
| ConsumerRecords records = consumer.poll(5000); |
| if (!records.isEmpty()) { |
| for (ConsumerRecord record : (Iterable<ConsumerRecord>) records) { |
| Assert.assertEquals(new String((byte[]) record.value()), String.valueOf(count)); |
| count++; |
| } |
| } else { |
| emptyPollCount++; |
| } |
| } |
| |
| Assert.assertEquals(count, expectedNumMessages); |
| } |
| |
| /** |
| * A wrapper class to consolidate all the components required to be either mocked or stubbed prior to unit testing |
| * the stream processor. |
| */ |
| private static class TestStubs { |
| CountDownLatch shutdownLatch; |
| KafkaConsumer consumer; |
| KafkaProducer producer; |
| StreamProcessor processor; |
| ProcessorLifecycleListener listener; |
| |
| private TestStubs(String bootstrapServer) { |
| shutdownLatch = new CountDownLatch(1); |
| initProcessorListener(); |
| initConsumer(bootstrapServer); |
| initProducer(bootstrapServer); |
| } |
| |
| TestStubs(Config config, StreamTaskFactory taskFactory, String bootstrapServer) { |
| this(bootstrapServer); |
| processor = new StreamProcessor("1", config, new HashMap<>(), taskFactory, listener); |
| } |
| |
| TestStubs(Config config, AsyncStreamTaskFactory taskFactory, String bootstrapServer) { |
| this(bootstrapServer); |
| processor = new StreamProcessor("1", config, new HashMap<>(), taskFactory, listener); |
| } |
| |
| private void initConsumer(String bootstrapServer) { |
| consumer = TestUtils.createConsumer( |
| bootstrapServer, |
| "group", |
| "earliest", |
| 4096L, |
| "org.apache.kafka.clients.consumer.RangeAssignor", |
| 30000, |
| SecurityProtocol.PLAINTEXT, |
| Option$.MODULE$.<File>empty(), |
| Option$.MODULE$.<Properties>empty(), |
| new StringDeserializer(), |
| new ByteArrayDeserializer(), |
| Option$.MODULE$.<Properties>empty()); |
| } |
| |
| private void initProcessorListener() { |
| listener = mock(ProcessorLifecycleListener.class); |
| doNothing().when(listener).afterStart(); |
| doNothing().when(listener).afterFailure(any()); |
| doAnswer(invocation -> { |
| // stopped successfully |
| shutdownLatch.countDown(); |
| return null; |
| }).when(listener).afterStop(); |
| } |
| |
| private void initProducer(String bootstrapServer) { |
| producer = TestUtils.createProducer( |
| bootstrapServer, |
| 1, |
| 60 * 1000L, |
| 1024L * 1024L, |
| 0, |
| 0L, |
| 5 * 1000L, |
| SecurityProtocol.PLAINTEXT, |
| null, |
| Option$.MODULE$.<Properties>apply(new Properties()), |
| new StringSerializer(), |
| new ByteArraySerializer(), |
| Option$.MODULE$.<Properties>apply(new Properties())); |
| } |
| } |
| } |