blob: 3d0b4cca799924c0314b3117e9d03bcee467084a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.metron.profiler;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.Before;
import org.junit.Test;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.apache.metron.stellar.common.utils.ConversionUtils.convert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
* Tests the ProfileBuilder class.
public class DefaultProfileBuilderTest {
* {
* "ip_src_addr": "",
* "ip_dst_addr": "",
* "value": 100,
* "timestamp": "2017-08-18 09:00:00"
* }
private String input;
private JSONObject message;
private ProfileBuilder builder;
private ProfileConfig definition;
public void setup() throws Exception {
message = (JSONObject) new JSONParser().parse(input);
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {
* "x": "100",
* "y": "200"
* },
* "result": "x + y"
* }
private String testInitProfile;
* Ensure that the 'init' block is executed correctly.
public void testInit() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate that x = 100, y = 200
assertEquals(100 + 200, (int) convert(m.get().getProfileValue(), Integer.class));
* The 'init' block is executed only when the first message is received. If no message
* has been received, the 'init' block will not be executed.
public void testInitWithNoMessage() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
Optional<ProfileMeasurement> m = builder.flush();
// validate that x = 0 and y = 0 as no initialization occurred
assertEquals(0, (int) convert(m.get().getProfileValue(), Integer.class));
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {
* "x": "0",
* "y": "0"
* },
* "update": {
* "x": "x + 1",
* "y": "y + 2"
* },
* "result": "x + y"
* }
private String testUpdateProfile;
* Ensure that the 'update' expressions are executed for each message applied to the profile.
public void testUpdate() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
int count = 10;
for(int i=0; i<count; i++) {
// apply the message
builder.apply(message, timestamp);
// advance time
timestamp += 5;
Optional<ProfileMeasurement> m = builder.flush();
// validate that x=0, y=0 then x+=1, y+=2 for each message
assertEquals(count*1 + count*2, (int) convert(m.get().getProfileValue(), Integer.class));
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": { "x": "100" },
* "result": "x"
* }
private String testResultProfile;
* Ensure that the result expression is executed on a flush.
public void testResult() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate
assertEquals(100, (int) convert(m.get().getProfileValue(), Integer.class));
* Ensure that time advances properly on each flush.
public void testProfilePeriodOnFlush() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// apply a message and flush
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate the profile period
ProfilePeriod expected = ProfilePeriod.fromTimestamp(timestamp, 10, TimeUnit.MINUTES);
assertEquals(expected, m.get().getPeriod());
// advance time by at least one period... about 10 minutes
timestamp += TimeUnit.MINUTES.toMillis(10);
// apply a message and flush again
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate the profile period
ProfilePeriod expected = ProfilePeriod.fromTimestamp(timestamp, 10, TimeUnit.MINUTES);
assertEquals(expected, m.get().getPeriod());
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": { "x": "100" },
* "groupBy": ["x * 1", "x * 2"],
* "result": "100.0"
* }
private String testGroupByProfile;
* Ensure that the 'groupBy' expression is executed correctly.
public void testGroupBy() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate
assertEquals(2, m.get().getGroups().size());
assertEquals(100, m.get().getGroups().get(0));
assertEquals(200, m.get().getGroups().get(1));
* {
* "profile": "test-profile",
* "foreach": "ip_src_addr",
* "init": { "x": "100" },
* "groupBy": ["profile","entity","start","end","duration","result"],
* "result": "100"
* }
private String testStateAvailableToGroupBy;
* The 'groupBy' expression should be able to reference information about the profile including
* the profile name, entity name, start of period, end of period, duration, and result.
public void testStateAvailableToGroupBy() throws Exception {
// setup
long timestamp = 1503081070340L;
ProfilePeriod period = ProfilePeriod.fromTimestamp(timestamp, 10, TimeUnit.MINUTES);
definition = JSONUtils.INSTANCE.load(testStateAvailableToGroupBy, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate all values that should be accessible by the groupBy expression(s)
assertEquals(6, m.get().getGroups().size());
assertEquals("invalid profile", "test-profile", m.get().getGroups().get(0));
assertEquals("invalid entity", "", m.get().getGroups().get(1));
assertEquals("invalid start", period.getStartTimeMillis(), m.get().getGroups().get(2));
assertEquals("invalid end", period.getEndTimeMillis(), m.get().getGroups().get(3));
assertEquals("invalid duration", period.getDurationMillis(), m.get().getGroups().get(4));
assertEquals("invalid result", 100, m.get().getGroups().get(5));
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {
* "x": "if exists(x) then x else 0",
* "y": "if exists(y) then y else 0"
* },
* "update": {
* "x": "x + 1",
* "y": "y + 2"
* },
* "result": "x + y"
* }
private String testFlushProfile;
public void testFlushDoesNotClearsState() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute - accumulate some state then flush it
int count = 10;
for(int i=0; i<count; i++) {
// apply the message
builder.apply(message, timestamp);
// advance time
timestamp += 5;
// advance time beyond the current period
timestamp += TimeUnit.MINUTES.toMillis(20);
// apply another message to accumulate new state, then flush again to validate original state was cleared
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate
assertEquals(33, m.get().getProfileValue());
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {
* "x": "0",
* "y": "0"
* },
* "update": {
* "x": "x + 1",
* "y": "y + 2"
* },
* "result": "x + y"
* }
private String testFlushProfileWithNaiveInit;
public void testFlushDoesNotClearsStateButInitDoes() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testFlushProfileWithNaiveInit, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute - accumulate some state then flush it
int count = 10;
for(int i=0; i<count; i++) {
// apply a message
builder.apply(message, timestamp);
// advance time
timestamp += 5;
// advance time beyond the current period
timestamp += TimeUnit.MINUTES.toMillis(20);
// apply another message to accumulate new state, then flush again to validate original state was cleared
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate
assertEquals(3, m.get().getProfileValue());
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "result": "100"
* }
private String testEntityProfile;
* Ensure that the entity is correctly set on the resulting profile measurements.
public void testEntity() throws Exception {
// setup
long timestamp = 100;
final String entity = "";
definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate
assertEquals(entity, m.get().getEntity());
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {
* "x": "100"
* },
* "result": {
* "profile": "x"
* }
* }
private String testResultWithProfileExpression;
* Ensure that the result expression is executed on a flush.
public void testResultWithProfileExpression() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate
assertEquals(100, m.get().getProfileValue());
* {
* "profile": "test",
* "foreach": "ip_src_addr",
* "init": {
* "x": "100"
* },
* "result": {
* "profile": "x",
* "triage": {
* "zero": "x - 100",
* "hundred": "x"
* }
* }
* }
private String testResultWithTriageExpression;
* Ensure that the result expression is executed on a flush.
public void testResultWithTriageExpression() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
// validate
assertEquals(0, m.get().getTriageValues().get("zero"));
assertEquals(100, m.get().getTriageValues().get("hundred"));
assertEquals(100, m.get().getProfileValue());
* {
* "profile": "bad-init",
* "foreach": "ip_src_addr",
* "init": { "x": "2 / 0" },
* "update": { "x": "x + 1" },
* "result": "x + y",
* "groupBy": ["cheese"]
* }
private String badInitProfile;
public void testBadInitExpression() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badInitProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// due to the bad expression, there should be no result
builder.apply(message, timestamp);
* {
* "profile": "bad-simple-result",
* "foreach": "ip_src_addr",
* "init": { "x": "0" },
* "update": { "x": "x + 1" },
* "result": "2 / 0",
* "groupBy": ["cheese"]
* }
private String badSimpleResultProfile;
public void testBadResultExpression() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badSimpleResultProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// due to the bad expression, there should be no result
builder.apply(message, timestamp);
* {
* "profile": "bad-groupBy",
* "foreach": "ip_src_addr",
* "init": { "x": "0" },
* "update": { "x": "x + 1" },
* "result": "x",
* "groupBy": ["nonexistant"]
* }
private String badGroupByProfile;
public void testBadGroupByExpression() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badGroupByProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// due to the bad expression, there should be no result
builder.apply(message, timestamp);
* {
* "profile": "bad-result-profile",
* "foreach": "ip_src_addr",
* "init": { "x": "100" },
* "result": {
* "profile": "2 / 0",
* "triage": {
* "zero": "x - 100",
* "hundred": "x"
* }
* }
* }
private String badResultProfile;
public void testBadResultProfileExpression() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badResultProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// due to the bad expression, there should be no result
builder.apply(message, timestamp);
* {
* "profile": "bad-result-triage",
* "foreach": "ip_src_addr",
* "init": { "x": "100" },
* "result": {
* "profile": "x",
* "triage": {
* "zero": "x - 100",
* "hundred": "2 / 0"
* }
* }
* }
private String badResultTriage;
public void testBadResultTriageExpression() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badResultTriage, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// due to the bad expression, there should be no result
builder.apply(message, timestamp);
* {
* "profile": "bad-update",
* "foreach": "ip_src_addr",
* "init": { "x": "0" },
* "update": { "x": "x + (2/0)" },
* "result": "x"
* }
private String badUpdateProfile;
* If the 'init' expression succeeds, but the 'update' fails, the profile should still flush. We cannot
* be sure if the 'update' is failing on every message or just one. Since that is the case, the profile
* flushes whatever data it has.
public void testBadUpdateExpression() throws Exception {
// setup
long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badUpdateProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withPeriodDuration(10, TimeUnit.MINUTES)
// execute
builder.apply(message, timestamp);
// if the update expression fails, the profile should still flush.
Optional<ProfileMeasurement> m = builder.flush();
assertEquals(0, m.get().getProfileValue());