package org.apache.metron.profiler.storm;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
import org.apache.metron.profiler.MessageDistributor;
import org.apache.metron.profiler.MessageRoute;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.storm.integration.MessageBuilder;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import org.json.simple.JSONObject;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
* Tests the ProfileBuilderBolt.
public class ProfileBuilderBoltTest extends BaseBoltTest {
private JSONObject message1;
private JSONObject message2;
private ProfileConfig profile1;
private ProfileConfig profile2;
private ProfileMeasurementEmitter emitter;
private ManualFlushSignal flushSignal;
private ProfileMeasurement measurement;
public void setup() throws Exception {
message1 = new MessageBuilder()
.withField("ip_src_addr", "")
.withField("value", "22")
message2 = new MessageBuilder()
.withField("ip_src_addr", "")
.withField("value", "22")
profile1 = new ProfileConfig()
.withInit("x", "0")
.withUpdate("x", "x + 1")
profile2 = new ProfileConfig()
.withInit(Collections.singletonMap("x", "0"))
.withUpdate(Collections.singletonMap("x", "x + 1"))
measurement = new ProfileMeasurement()
.withPeriod(1000, 500, TimeUnit.MILLISECONDS)
flushSignal = new ManualFlushSignal();
* The bolt should extract a message and timestamp from a tuple and
* pass that to a {@code MessageDistributor}.
public void testExtractMessage() throws Exception {
ProfileBuilderBolt bolt = createBolt();
// create a mock
MessageDistributor distributor = mock(MessageDistributor.class);
// create a tuple
final long timestamp1 = 100000000L;
Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1);
// execute the bolt
TupleWindow tupleWindow = createWindow(tuple1);
// the message should have been extracted from the tuple and passed to the MessageDistributor
verify(distributor).distribute(any(MessageRoute.class), any());
* If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor}
* and emit the {@code ProfileMeasurement} values from all active profiles.
public void testFlushActiveProfiles() throws Exception {
ProfileBuilderBolt bolt = createBolt();
// create a mock that returns the profile measurement above
MessageDistributor distributor = mock(MessageDistributor.class);
// signal the bolt to flush
// execute the bolt
Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
TupleWindow tupleWindow = createWindow(tuple1);
// a profile measurement should be emitted by the bolt
List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1);
assertEquals(1, measurements.size());
assertEquals(measurement, measurements.get(0));
* If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted.
public void testDoNotFlushActiveProfiles() throws Exception {
ProfileBuilderBolt bolt = createBolt();
// create a mock where flush() returns the profile measurement above
MessageDistributor distributor = mock(MessageDistributor.class);
// there is no flush signal
// execute the bolt
Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
TupleWindow tupleWindow = createWindow(tuple1);
// nothing should have been emitted
getProfileMeasurements(outputCollector, 0);
* Expired profiles should be flushed regularly, even if no input telemetry
* has been received.
public void testFlushExpiredProfiles() throws Exception {
ProfileBuilderBolt bolt = createBolt();
// create a mock where flushExpired() returns the profile measurement above
MessageDistributor distributor = mock(MessageDistributor.class);
// execute test by flushing expired profiles. this is normally triggered by a timer task.
// a profile measurement should be emitted by the bolt
List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1);
assertEquals(1, measurements.size());
assertEquals(measurement, measurements.get(0));
* A {@link ProfileMeasurement} is built for each profile/entity pair. The measurement should be emitted to each
* destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
public void testEmitters() throws Exception {
// defines the zk configurations accessible from the bolt
ProfilerConfigurations configurations = new ProfilerConfigurations();
// create the bolt with 3 destinations
ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
.withProfileTimeToLive(30, TimeUnit.MINUTES)
.withPeriodDuration(10, TimeUnit.MINUTES)
.withEmitter(new TestEmitter("destination1"))
.withEmitter(new TestEmitter("destination2"))
.withEmitter(new TestEmitter("destination3"))
.withTumblingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.MINUTES));
bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
// signal the bolt to flush
// execute the bolt
Tuple tuple1 = createTuple("entity", message1, profile1, System.currentTimeMillis());
TupleWindow window = createWindow(tuple1);
// validate measurements emitted to each
verify(outputCollector, times(1)).emit(eq("destination1"), any());
verify(outputCollector, times(1)).emit(eq("destination2"), any());
verify(outputCollector, times(1)).emit(eq("destination3"), any());
public void testExceptionWhenFlushingExpiredProfiles() throws Exception {
// create an emitter that will throw an exception when emit() called
ProfileMeasurementEmitter badEmitter = mock(ProfileMeasurementEmitter.class);
doThrow(new RuntimeException("flushExpired() should catch this exception"))
.emit(any(), any());
// create a distributor that will return a profile measurement
MessageDistributor distributor = mock(MessageDistributor.class);
// the bolt will use the bad emitter when flushExpired() is called
ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
// the exception thrown by the emitter should not bubble up
* Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
* @param collector The Storm output collector.
* @param expected The number of measurements expected.
* @return A list of ProfileMeasurement(s).
private List<ProfileMeasurement> getProfileMeasurements(OutputCollector collector, int expected) {
// the 'streamId' is defined by the DestinationHandler being used by the bolt
final String streamId = emitter.getStreamId();
// capture the emitted tuple(s)
ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class);
verify(collector, times(expected))
.emit(eq(streamId), argCaptor.capture());
// return the profile measurements that were emitted
return argCaptor.getAllValues()
.map(val -> (ProfileMeasurement) val.get(0))
* Create a tuple that will contain the message, the entity name, and profile definition.
* @param entity The entity name
* @param message The telemetry message.
* @param profile The profile definition.
private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile, long timestamp) {
Tuple tuple = mock(Tuple.class);
return tuple;
* Create a ProfileBuilderBolt to test.
* @return A {@link ProfileBuilderBolt} to test.
private ProfileBuilderBolt createBolt() throws IOException {
// defines the zk configurations accessible from the bolt
ProfilerConfigurations configurations = new ProfilerConfigurations();
emitter = new HBaseEmitter();
ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
.withProfileTimeToLive(30, TimeUnit.MINUTES)
.withPeriodDuration(1, TimeUnit.MINUTES)
.withTumblingWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS));
bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
// set the flush signal AFTER calling 'prepare'
return bolt;
* Creates a mock TupleWindow containing multiple tuples.
* @param tuples The tuples to add to the window.
private TupleWindow createWindow(Tuple... tuples) {
TupleWindow window = mock(TupleWindow.class);
return window;
* An implementation for testing purposes only.
private class TestEmitter implements ProfileMeasurementEmitter {
private String streamId;
public TestEmitter(String streamId) {
this.streamId = streamId;
public String getStreamId() {
return streamId;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(getStreamId(), new Fields("measurement"));
public void emit(ProfileMeasurement measurement, OutputCollector collector) {
collector.emit(getStreamId(), new Values(measurement));