blob: b0575362415ec0464d1ea101c0f3998882f38799 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.streams;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
import org.apache.storm.streams.operations.aggregators.LongSum;
import org.apache.storm.streams.processors.AggregateProcessor;
import org.apache.storm.streams.processors.FilterProcessor;
import org.apache.storm.streams.processors.Processor;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/**
* Unit tests for {@link ProcessorBolt}
*/
public class ProcessorBoltTest {
TopologyContext mockTopologyContext;
OutputCollector mockOutputCollector;
ProcessorBolt bolt;
Tuple mockTuple1;
Tuple mockTuple2;
Tuple mockTuple3;
Tuple punctuation;
Multimap<String, ProcessorNode> mockStreamToProcessors;
DirectedGraph<Node, Edge> graph;
@Before
public void setUp() throws Exception {
mockTopologyContext = Mockito.mock(TopologyContext.class);
mockOutputCollector = Mockito.mock(OutputCollector.class);
mockTuple1 = Mockito.mock(Tuple.class);
mockTuple2 = Mockito.mock(Tuple.class);
mockTuple3 = Mockito.mock(Tuple.class);
setUpMockTuples(mockTuple1, mockTuple2, mockTuple3);
punctuation = Mockito.mock(Tuple.class);
setUpPunctuation(punctuation);
mockStreamToProcessors = Mockito.mock(Multimap.class);
graph = new DefaultDirectedGraph(new StreamsEdgeFactory());
}
@Test
public void testEmitAndAck() throws Exception {
setUpProcessorBolt(new FilterProcessor<Integer>(x -> true));
bolt.execute(mockTuple1);
ArgumentCaptor<Collection> anchor = ArgumentCaptor.forClass(Collection.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockOutputCollector).emit(os.capture(), anchor.capture(), values.capture());
assertEquals("outputstream", os.getValue());
assertArrayEquals(new Object[]{ mockTuple1 }, anchor.getValue().toArray());
assertEquals(new Values(100), values.getValue());
Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple1);
}
@Test
public void testAggResultAndAck() throws Exception {
setUpProcessorBolt(new AggregateProcessor<>(new LongSum()), Collections.singleton("inputstream"), true, null);
bolt.execute(mockTuple2);
bolt.execute(mockTuple3);
bolt.execute(punctuation);
ArgumentCaptor<Collection> anchor = ArgumentCaptor.forClass(Collection.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockOutputCollector, Mockito.times(2)).emit(os.capture(), anchor.capture(), values.capture());
assertArrayEquals(new Object[]{ mockTuple2, mockTuple3, punctuation }, anchor.getAllValues().get(0).toArray());
assertArrayEquals(new Object[]{ mockTuple2, mockTuple3, punctuation }, anchor.getAllValues().get(1).toArray());
assertArrayEquals(new Object[]{ new Values(200L), new Values("__punctuation") }, values.getAllValues().toArray());
assertArrayEquals(new Object[]{ "outputstream", "outputstream__punctuation" }, os.getAllValues().toArray());
Mockito.verify(mockOutputCollector).ack(mockTuple2);
Mockito.verify(mockOutputCollector).ack(mockTuple3);
Mockito.verify(mockOutputCollector).ack(punctuation);
}
@Test
public void testEmitTs() throws Exception {
Tuple tupleWithTs = Mockito.mock(Tuple.class);
setUpMockTuples(tupleWithTs);
Mockito.when(tupleWithTs.getLongByField("ts")).thenReturn(12345L);
setUpProcessorBolt(new FilterProcessor(x -> true), "ts");
bolt.execute(tupleWithTs);
ArgumentCaptor<Collection> anchor = ArgumentCaptor.forClass(Collection.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<String> os = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockOutputCollector).emit(os.capture(), anchor.capture(), values.capture());
assertEquals("outputstream", os.getValue());
assertArrayEquals(new Object[]{ tupleWithTs }, anchor.getValue().toArray());
assertEquals(new Values(100, 12345L), values.getValue());
Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(tupleWithTs);
}
private void setUpProcessorBolt(Processor<?> processor) {
setUpProcessorBolt(processor, Collections.emptySet(), false, null);
}
private void setUpProcessorBolt(Processor<?> processor, String tsFieldName) {
setUpProcessorBolt(processor, Collections.emptySet(), false, tsFieldName);
}
private void setUpProcessorBolt(Processor<?> processor,
Set<String> windowedParentStreams,
boolean isWindowed,
String tsFieldName) {
ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value"));
node.setWindowedParentStreams(windowedParentStreams);
node.setWindowed(isWindowed);
Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
Map<GlobalStreamId, Grouping> mockSources = Mockito.mock(Map.class);
GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockSources);
Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));
Mockito.when(mockGlobalStreamId.get_streamId()).thenReturn("inputstream");
Mockito.when(mockGlobalStreamId.get_componentId()).thenReturn("bolt0");
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
graph.addVertex(node);
bolt = new ProcessorBolt("bolt1", graph, Collections.singletonList(node));
if (tsFieldName != null && !tsFieldName.isEmpty()) {
bolt.setTimestampField(tsFieldName);
}
bolt.setStreamToInitialProcessors(mockStreamToProcessors);
bolt.prepare(new HashMap<>(), mockTopologyContext, mockOutputCollector);
}
private void setUpMockTuples(Tuple... tuples) {
for (Tuple tuple : tuples) {
Mockito.when(tuple.size()).thenReturn(1);
Mockito.when(tuple.getValue(0)).thenReturn(100);
Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0");
Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream");
}
}
private void setUpPunctuation(Tuple punctuation) {
Mockito.when(punctuation.size()).thenReturn(1);
Mockito.when(punctuation.getValue(0)).thenReturn(WindowNode.PUNCTUATION);
Mockito.when(punctuation.getSourceComponent()).thenReturn("bolt0");
Mockito.when(punctuation.getSourceStreamId()).thenReturn("inputstream");
}
}