blob: a14776880c17871a8204bd879b7937af1022cba2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler.storm;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.profiler.DefaultMessageRouter;
import org.apache.metron.profiler.clock.FixedClockFactory;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.VALUE;
import static org.mockito.Mockito.*;
/**
* Tests the ProfileSplitterBolt.
*/
public class ProfileSplitterBoltTest extends BaseBoltTest {
/**
* {
* "ip_src_addr": "10.0.0.1",
* "ip_dst_addr": "10.0.0.20",
* "protocol": "HTTP",
* "timestamp.custom": 2222222222222,
* "timestamp.string": "3333333333333"
* }
*/
@Multiline
private String input;
/**
* {
* "profiles": [
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "onlyif": "protocol == 'HTTP'",
* "init": {},
* "update": {},
* "result": "2"
* }
* ]
* }
*/
@Multiline
private String profileWithOnlyIfTrue;
/**
* {
* "profiles": [
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "onlyif": "false",
* "init": {},
* "update": {},
* "result": "2"
* }
* ]
* }
*/
@Multiline
private String profileWithOnlyIfFalse;
/**
* {
* "profiles": [
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {},
* "update": {},
* "result": "2"
* }
* ]
* }
*/
@Multiline
private String profileWithOnlyIfMissing;
/**
* {
* "profiles": [
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "onlyif": "NOT-VALID",
* "init": {},
* "update": {},
* "result": "2"
* }
* ]
* }
*/
@Multiline
private String profileWithOnlyIfInvalid;
/**
* {
* "profiles": [
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {},
* "update": {},
* "result": "2"
* }
* ],
* "timestampField": "timestamp.custom"
* }
*/
@Multiline
private String profileUsingCustomTimestampField;
/**
* {
* "profiles": [
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {},
* "update": {},
* "result": "2"
* }
* ],
* "timestampField": "timestamp.missing"
* }
*/
@Multiline
private String profileUsingMissingTimestampField;
/**
* {
* "profiles": [
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {},
* "update": {},
* "result": "2"
* }
* ],
* "timestampField": "timestamp.string"
* }
*/
@Multiline
private String profileUsingStringTimestampField;
/**
* {
* "profiles": [
* ]
* }
*/
@Multiline
private String noProfilesDefined;
/**
* {
* "profiles": [
* {
* "profile": "profile1",
* "foreach": "'global'",
* "result": "1"
* },
* {
* "profile": "profile2",
* "foreach": "'global'",
* "result": "2"
* }
* ]
* }
*/
@Multiline
private String twoProfilesDefined;
private JSONObject message;
private long timestamp = 3333333;
@BeforeEach
public void setup() throws ParseException {
// parse the input message
JSONParser parser = new JSONParser();
message = (JSONObject) parser.parse(input);
// ensure the tuple returns the expected json message
when(tuple.getBinaryByField(VALUE.getFieldName())).thenReturn(input.getBytes(StandardCharsets.UTF_8));
}
/**
* Ensure that a tuple with the correct fields is emitted to downstream bolts
* when a profile is defined.
*/
@Test
public void testEmitTupleWithOneProfile() throws Exception {
// setup the bolt and execute a tuple
ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// the expected tuple fields
String expectedEntity = "10.0.0.1";
ProfileConfig expectedConfig = config.getProfiles().get(0);
Values expected = new Values(message, timestamp, expectedEntity, expectedConfig);
// a tuple should be emitted for the downstream profile builder
verify(outputCollector, times(1))
.emit(eq(tuple), eq(expected));
// the original tuple should be ack'd
verify(outputCollector, times(1))
.ack(eq(tuple));
}
/**
* If there are two profiles that need the same message, then two tuples should
* be emitted. One tuple for each profile.
*/
@Test
public void testEmitTupleWithTwoProfiles() throws Exception {
// setup the bolt and execute a tuple
ProfilerConfig config = toProfilerConfig(twoProfilesDefined);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// the expected tuple fields
final String expectedEntity = "global";
{
// a tuple should be emitted for the first profile
ProfileConfig profile1 = config.getProfiles().get(0);
Values expected = new Values(message, timestamp, expectedEntity, profile1);
verify(outputCollector, times(1))
.emit(eq(tuple), eq(expected));
}
{
// a tuple should be emitted for the second profile
ProfileConfig profile2 = config.getProfiles().get(1);
Values expected = new Values(message, timestamp, expectedEntity, profile2);
verify(outputCollector, times(1))
.emit(eq(tuple), eq(expected));
}
// the original tuple should be ack'd
verify(outputCollector, times(1))
.ack(eq(tuple));
}
/**
* No tuples should be emitted, if no profiles are defined.
*/
@Test
public void testNoProfilesDefined() throws Exception {
// setup the bolt and execute a tuple
ProfilerConfig config = toProfilerConfig(noProfilesDefined);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// no tuple should be emitted
verify(outputCollector, times(0))
.emit(any(Tuple.class), any());
// the original tuple should be ack'd
verify(outputCollector, times(1))
.ack(eq(tuple));
}
/**
* What happens when a profile's 'onlyif' expression is true? The message
* should be applied to the profile.
*/
@Test
public void testOnlyIfTrue() throws Exception {
ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should be emitted for the downstream profile builder
verify(outputCollector, times(1))
.emit(eq(tuple), any(Values.class));
// the original tuple should be ack'd
verify(outputCollector, times(1))
.ack(eq(tuple));
}
/**
* All messages are applied to a profile where 'onlyif' is missing. A profile with no
* 'onlyif' is treated the same as if 'onlyif=true'.
*/
@Test
public void testOnlyIfMissing() throws Exception {
ProfilerConfig config = toProfilerConfig(profileWithOnlyIfMissing);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should be emitted for the downstream profile builder
verify(outputCollector, times(1))
.emit(eq(tuple), any(Values.class));
// the original tuple should be ack'd
verify(outputCollector, times(1))
.ack(eq(tuple));
}
/**
* What happens when a profile's 'onlyif' expression is false? The message
* should NOT be applied to the profile.
*/
@Test
public void testOnlyIfFalse() throws Exception {
ProfilerConfig config = toProfilerConfig(profileWithOnlyIfFalse);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should NOT be emitted for the downstream profile builder
verify(outputCollector, times(0))
.emit(any());
// the original tuple should be ack'd
verify(outputCollector, times(1))
.ack(eq(tuple));
}
/**
* The entity associated with a profile is defined with a Stellar expression. That expression
* can refer to any field within the message.
*
* In this case the entity is defined as 'ip_src_addr' which is resolved to '10.0.0.1' based on
* the data contained within the message.
*/
@Test
public void testResolveEntityName() throws Exception {
ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// expected values
String expectedEntity = "10.0.0.1";
ProfileConfig expectedConfig = config.getProfiles().get(0);
Values expected = new Values(message, timestamp, expectedEntity, expectedConfig);
// a tuple should be emitted for the downstream profile builder
verify(outputCollector, times(1))
.emit(eq(tuple), eq(expected));
// the original tuple should be ack'd
verify(outputCollector, times(1))
.ack(eq(tuple));
}
/**
* What happens when invalid Stella code is used for 'onlyif'? The invalid profile should be ignored.
*/
@Test
public void testOnlyIfInvalid() throws Exception {
ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should NOT be emitted for the downstream profile builder
verify(outputCollector, times(0))
.emit(any(Values.class));
}
@Test
public void testWithNullMessage() throws Exception {
// ensure the tuple returns null to mimic a null message in kafka
when(tuple.getBinary(0)).thenReturn(null);
ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should NOT be emitted for the downstream profile builder
verify(outputCollector, times(0))
.emit(any(Values.class));
}
/**
* Creates a ProfilerConfig based on a string containing JSON.
*
* @param configAsJSON The config as JSON.
* @return The ProfilerConfig.
* @throws Exception
*/
private ProfilerConfig toProfilerConfig(String configAsJSON) throws Exception {
InputStream in = new ByteArrayInputStream(configAsJSON.getBytes(StandardCharsets.UTF_8));
return JSONUtils.INSTANCE.load(in, ProfilerConfig.class);
}
/**
* Create a ProfileSplitterBolt to test
*/
private ProfileSplitterBolt createBolt(ProfilerConfig config) {
ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
bolt.setCuratorFramework(client);
bolt.setZKCache(cache);
bolt.getConfigurations().updateProfilerConfig(config);
bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
// set the clock factory AFTER calling prepare to use the fixed clock factory
DefaultMessageRouter router = new DefaultMessageRouter(bolt.getStellarContext());
router.setClockFactory(new FixedClockFactory(timestamp));
bolt.setRouter(router);
return bolt;
}
}