blob: eb64bb761dd20229ec84fc8a6ec6a4f2639254fb [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler;
import com.github.benmanes.caffeine.cache.Ticker;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class DefaultMessageDistributorTest {
/**
* {
* "ip_src_addr": "10.0.0.1",
* "value": "22"
* }
*/
@Multiline
private String inputOne;
private JSONObject messageOne;
/**
* {
* "ip_src_addr": "10.0.0.2",
* "value": "22"
* }
*/
@Multiline
private String inputTwo;
private JSONObject messageTwo;
/**
* {
* "profile": "profile-one",
* "foreach": "ip_src_addr",
* "init": { "x": "0" },
* "update": { "x": "x + 1" },
* "result": "x"
* }
*/
@Multiline
private String profileOne;
/**
* {
* "profile": "profile-two",
* "foreach": "ip_src_addr",
* "init": { "x": "0" },
* "update": { "x": "x + 1" },
* "result": "x"
* }
*/
@Multiline
private String profileTwo;
private DefaultMessageDistributor distributor;
private Context context;
private long periodDurationMillis = MINUTES.toMillis(15);
private long profileTimeToLiveMillis = MINUTES.toMillis(30);
private long maxNumberOfRoutes = Long.MAX_VALUE;
@BeforeEach
public void setup() throws Exception {
context = Context.EMPTY_CONTEXT();
JSONParser parser = new JSONParser();
messageOne = (JSONObject) parser.parse(inputOne);
messageTwo = (JSONObject) parser.parse(inputTwo);
distributor = new DefaultMessageDistributor(
periodDurationMillis,
profileTimeToLiveMillis,
maxNumberOfRoutes,
Ticker.systemTicker());
}
/**
* Creates a profile definition based on a string of JSON.
* @param json The string of JSON.
*/
private ProfileConfig createDefinition(String json) throws IOException {
return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
}
/**
* Tests that one message can be distributed to one profile.
*/
@Test
public void testDistribute() throws Exception {
// setup
long timestamp = 100;
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
MessageRoute route = new MessageRoute(definition, entity, messageOne, timestamp);
// distribute one message and flush
distributor.distribute(route, context);
List<ProfileMeasurement> measurements = distributor.flush();
// expect one measurement coming from one profile
assertEquals(1, measurements.size());
ProfileMeasurement m = measurements.get(0);
assertEquals(definition.getProfile(), m.getProfileName());
assertEquals(entity, m.getEntity());
}
@Test
public void testDistributeWithTwoProfiles() throws Exception {
// setup
long timestamp = 100;
String entity = (String) messageOne.get("ip_src_addr");
// distribute one message to the first profile
MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity, messageOne, timestamp);
distributor.distribute(routeOne, context);
// distribute another message to the second profile, but same entity
MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity, messageOne, timestamp);
distributor.distribute(routeTwo, context);
// expect 2 measurements; 1 for each profile
List<ProfileMeasurement> measurements = distributor.flush();
assertEquals(2, measurements.size());
}
@Test
public void testDistributeWithTwoEntities() throws Exception {
// setup
long timestamp = 100;
// distribute one message
String entityOne = (String) messageOne.get("ip_src_addr");
MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne, messageOne, timestamp);
distributor.distribute(routeOne, context);
// distribute another message with a different entity
String entityTwo = (String) messageTwo.get("ip_src_addr");
MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo, messageTwo, timestamp);
distributor.distribute(routeTwo, context);
// expect 2 measurements; 1 for each entity
List<ProfileMeasurement> measurements = distributor.flush();
assertEquals(2, measurements.size());
}
/**
* A profile should expire after a fixed period of time. This test ensures that
* profiles are not expired before they are supposed to be.
*/
@Test
public void testNotYetTimeToExpireProfiles() throws Exception {
// the ticker drives time to allow us to test cache expiration
FixedTicker ticker = new FixedTicker();
// setup
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
MessageRoute route = new MessageRoute(definition, entity, messageOne, ticker.read());
distributor = new DefaultMessageDistributor(
periodDurationMillis,
profileTimeToLiveMillis,
maxNumberOfRoutes,
ticker);
// distribute one message
distributor.distribute(route, context);
// advance time to just shy of the profile TTL
ticker.advanceTime(profileTimeToLiveMillis - 1000, MILLISECONDS);
// the profile should NOT have expired yet
assertEquals(0, distributor.flushExpired().size());
assertEquals(1, distributor.flush().size());
}
/**
* A profile should expire after a fixed period of time.
*/
@Test
public void testProfilesShouldExpire() throws Exception {
// the ticker drives time to allow us to test cache expiration
FixedTicker ticker = new FixedTicker();
// setup
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
MessageRoute route = new MessageRoute(definition, entity, messageOne, ticker.read());
distributor = new DefaultMessageDistributor(
periodDurationMillis,
profileTimeToLiveMillis,
maxNumberOfRoutes,
ticker);
// distribute one message
distributor.distribute(route, context);
// advance time to just beyond the period duration
ticker.advanceTime(profileTimeToLiveMillis + 1000, MILLISECONDS);
// the profile should have expired by now
assertEquals(1, distributor.flushExpired().size());
assertEquals(0, distributor.flush().size());
}
/**
* An expired profile is only kept around for a fixed period of time. It should be removed, if it
* has been on the expired cache for too long.
*/
@Test
public void testExpiredProfilesShouldBeRemoved() throws Exception {
// the ticker drives time to allow us to test cache expiration
FixedTicker ticker = new FixedTicker();
// setup
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
MessageRoute route = new MessageRoute(definition, entity, messageOne, ticker.read());
distributor = new DefaultMessageDistributor(
periodDurationMillis,
profileTimeToLiveMillis,
maxNumberOfRoutes,
ticker);
// distribute one message
distributor.distribute(route, context);
// advance time a couple of hours
ticker.advanceTime(2, HOURS);
// the profile should have been expired
assertEquals(0, distributor.flush().size());
// advance time a couple of hours
ticker.advanceTime(2, HOURS);
// the profile should have been removed from the expired cache
assertEquals(0, distributor.flushExpired().size());
}
/**
* An implementation of Ticker that can be used to drive time
* when testing the Guava caches.
*/
private class FixedTicker implements Ticker {
/**
* The time that will be reported.
*/
private long timestampNanos;
public FixedTicker() {
this.timestampNanos = Ticker.systemTicker().read();
}
public FixedTicker startAt(long timestampNanos) {
this.timestampNanos = timestampNanos;
return this;
}
public FixedTicker advanceTime(long time, TimeUnit units) {
this.timestampNanos += units.toNanos(time);
return this;
}
@Override
public long read() {
return this.timestampNanos;
}
}
}