blob: 4b14e4f750741b780ab730bbebb279ba94da45b9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.metron.profiler.storm.integration;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.integration.BaseIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.FluxTopologyComponent;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.profiler.client.stellar.FixedLookback;
import org.apache.metron.profiler.client.stellar.GetProfile;
import org.apache.metron.profiler.client.stellar.WindowLookback;
import org.apache.metron.statistics.OnlineStatisticsProvider;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.storm.Config;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static org.apache.metron.integration.utils.TestUtils.assertEventually;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.*;
import static org.apache.metron.profiler.storm.KafkaEmitter.*;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
* An integration test of the Profiler topology.
public class ProfilerIntegrationTest extends BaseIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler-storm/src/test";
private static final String FLUX_PATH = "src/main/flux/profiler/remote.yaml";
private static final long timeout = TimeUnit.SECONDS.toMillis(90);
public static final long startAt = 10;
public static final String entity = "";
private static final String tableName = "profiler";
private static final String columnFamily = "P";
private static final String inputTopic = Constants.INDEXING_TOPIC;
private static final String outputTopic = "profiles";
private static final int saltDivisor = 10;
private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(20);
private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(10);
private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(10);
private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20);
private static final long maxRoutesPerBolt = 100000;
private static ZKServerComponent zkComponent;
private static FluxTopologyComponent fluxComponent;
private static KafkaComponent kafkaComponent;
private static ConfigUploadComponent configUploadComponent;
private static ComponentRunner runner;
private static MockHTable profilerTable;
private static String message1;
private static String message2;
private static String message3;
private StellarStatefulExecutor executor;
* [
* org.apache.metron.profiler.ProfileMeasurement,
* org.apache.metron.profiler.ProfilePeriod,
* org.apache.metron.common.configuration.profiler.ProfileResult,
* org.apache.metron.common.configuration.profiler.ProfileResultExpressions,
* org.apache.metron.common.configuration.profiler.ProfileTriageExpressions,
* org.apache.metron.common.configuration.profiler.ProfilerConfig,
* org.apache.metron.common.configuration.profiler.ProfileConfig,
* org.json.simple.JSONObject,
* org.json.simple.JSONArray,
* java.util.LinkedHashMap,
* org.apache.metron.statistics.OnlineStatisticsProvider
* ]
private static String kryoSerializers;
* {
* "profiles": [
* {
* "profile": "processing-time-test",
* "foreach": "ip_src_addr",
* "init": { "counter": "0" },
* "update": { "counter": "counter + 1" },
* "result": "counter"
* }
* ]
* }
private static String processingTimeProfile;
public void testProcessingTime() throws Exception {
// start the topology and write 3 test messages to kafka
kafkaComponent.writeMessages(inputTopic, message1);
kafkaComponent.writeMessages(inputTopic, message2);
kafkaComponent.writeMessages(inputTopic, message3);
// retrieve the profile measurement using PROFILE_GET
String profileGetExpression = "PROFILE_GET('processing-time-test', '', PROFILE_FIXED('15', 'MINUTES'))";
List<Integer> measurements = execute(profileGetExpression, List.class);
// need to keep checking for measurements until the profiler has flushed one out
int attempt = 0;
while(measurements.size() == 0 && attempt++ < 10) {
// wait for the profiler to flush
long sleep = windowDurationMillis;
LOG.debug("Waiting {} millis for profiler to flush", sleep);
// write another message to advance time. this ensures we are testing the 'normal' flush mechanism.
// if we do not send additional messages to advance time, then it is the profile TTL mechanism which
// will ultimately flush the profile
kafkaComponent.writeMessages(inputTopic, message2);
// try again to retrieve the profile measurement using PROFILE_GET
measurements = execute(profileGetExpression, List.class);
// expect to see only 1 measurement, but could be more (one for each period) depending on
// how long we waited for the flush to occur
assertTrue(measurements.size() > 0);
// the profile should have counted at least 3 messages; the 3 test messages that were sent.
// the count could be higher due to the test messages we sent to advance time.
assertTrue(measurements.get(0) >= 3);
public void testProcessingTimeWithTimeToLiveFlush() throws Exception {
// start the topology and write 3 test messages to kafka
kafkaComponent.writeMessages(inputTopic, message1);
kafkaComponent.writeMessages(inputTopic, message2);
kafkaComponent.writeMessages(inputTopic, message3);
// wait a bit beyond the window lag before writing another message. this allows storm's window manager to close
// the event window, which then lets the profiler processes the previous messages.
long sleep = windowLagMillis + periodDurationMillis;
LOG.debug("Waiting {} millis before sending message to close window", sleep);
kafkaComponent.writeMessages(inputTopic, message3);
// the profile should have counted 3 messages; the 3 test messages that were sent
assertEventually(() -> {
List<Integer> results = execute("PROFILE_GET('processing-time-test', '', PROFILE_FIXED('15', 'MINUTES'))", List.class);
assertThat(results, hasItem(3));
}, timeout);
* {
* "timestampField": "timestamp",
* "profiles": [
* {
* "profile": "count-by-ip",
* "foreach": "ip_src_addr",
* "init": { "count": 0 },
* "update": { "count" : "count + 1" },
* "result": "count"
* },
* {
* "profile": "total-count",
* "foreach": "'total'",
* "init": { "count": 0 },
* "update": { "count": "count + 1" },
* "result": "count"
* }
* ]
* }
private static String eventTimeProfile;
public void testEventTime() throws Exception {
// start the topology and write test messages to kafka
List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
kafkaComponent.writeMessages(inputTopic, messages);
long timestamp = System.currentTimeMillis();
LOG.debug("Attempting to close window period by sending message with timestamp = {}", timestamp);
kafkaComponent.writeMessages(inputTopic, getMessage("", timestamp));
kafkaComponent.writeMessages(inputTopic, getMessage("", timestamp));
// create the 'window' that looks up to 5 hours before the max timestamp contained in the test data
assign("maxTimestamp", "1530978728982L");
assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
// wait until the profile flushes both periods. the first period will flush immediately as subsequent messages
// advance time. the next period contains all of the remaining messages, so there are no other messages to
// advance time. because of this the next period only flushes after the time-to-live expires
// there are 14 messages in the first period and 12 in the next where ip_src_addr =
assertEventually(() -> {
List<Integer> results = execute("PROFILE_GET('count-by-ip', '', window)", List.class);
assertThat(results, hasItems(14, 12));
}, timeout);
// there are 36 messages in the first period and 38 in the next where ip_src_addr =
assertEventually(() -> {
List<Integer> results = execute("PROFILE_GET('count-by-ip', '', window)", List.class);
assertThat(results, hasItems(36, 38));
}, timeout);
// in all there are 50 (36+14) messages in the first period and 50 (38+12) messages in the next
assertEventually(() -> {
List<Integer> results = execute("PROFILE_GET('total-count', 'total', window)", List.class);
assertThat(results, hasItems(50, 50));
}, timeout);
* {
* "profiles": [
* {
* "profile": "profile-with-stats",
* "foreach": "'global'",
* "init": { "stats": "STATS_INIT()" },
* "update": { "stats": "STATS_ADD(stats, 1)" },
* "result": "stats"
* }
* ],
* "timestampField": "timestamp"
* }
private static String profileWithStats;
* The result produced by a Profile has to be serializable within Storm. If the result is not
* serializable the topology will crash and burn.
* This test ensures that if a profile returns a STATS object created using the STATS_INIT and
* STATS_ADD functions, that it can be correctly serialized and persisted.
public void testProfileWithStatsObject() throws Exception {
// start the topology and write test messages to kafka
List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
kafkaComponent.writeMessages(inputTopic, messages);
assertEventually(() -> {
// validate the measurements written by the batch profiler using `PROFILE_GET`
// the 'window' looks up to 5 hours before the max timestamp contained in the test data
assign("maxTimestamp", "1530978728982L");
assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
// retrieve the stats stored by the profiler
List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class);
assertTrue(results.size() > 0);
assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
}, timeout);
* {
* "profiles": [
* {
* "profile": "profile-with-triage",
* "foreach": "'global'",
* "update": {
* "stats": "STATS_ADD(stats, 1)"
* },
* "result": {
* "profile": "stats",
* "triage": {
* "min": "STATS_MIN(stats)",
* "max": "STATS_MAX(stats)",
* "mean": "STATS_MEAN(stats)"
* }
* }
* }
* ],
* "timestampField": "timestamp"
* }
private static String profileWithTriageResult;
private List<byte[]> outputMessages;
public void testProfileWithTriageResult() throws Exception {
// start the topology and write test messages to kafka
List<String> telemetry = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
kafkaComponent.writeMessages(inputTopic, telemetry);
// wait until the triage message is output to kafka
assertEventually(() -> {
outputMessages = kafkaComponent.readMessages(outputTopic);
assertEquals(1, outputMessages.size());
}, timeout);
// validate the triage message
JSONObject message = (JSONObject) new JSONParser().parse(new String(outputMessages.get(0),
assertEquals("profile-with-triage", message.get(PROFILE_FIELD));
assertEquals("global", message.get(ENTITY_FIELD));
assertEquals(76548935L, message.get(PERIOD_ID_FIELD));
assertEquals(1530978700000L, message.get(PERIOD_START_FIELD));
assertEquals(1530978720000L, message.get(PERIOD_END_FIELD));
assertEquals("profiler", message.get(Constants.SENSOR_TYPE));
assertEquals("true", message.get(ALERT_FIELD));
assertEquals(1.0, message.get("min"));
assertEquals(1.0, message.get("max"));
assertEquals(1.0, message.get("mean"));
private static String getMessage(String ipSource, long timestamp) {
return new MessageBuilder()
.withField("ip_src_addr", ipSource)
.withField("timestamp", timestamp)
public static void setupBeforeClass() throws UnableToStartException {
// create some messages that contain a timestamp - a really old timestamp; close to 1970
message1 = getMessage(entity, startAt);
message2 = getMessage(entity, startAt + 100);
message3 = getMessage(entity, startAt + (windowDurationMillis * 2));
// storm topology properties
final Properties topologyProperties = new Properties() {{
// storm settings
setProperty("profiler.workers", "1");
setProperty("profiler.acker.executors", "0");
setProperty("profiler.spout.parallelism", "1");
setProperty("profiler.splitter.parallelism", "1");
setProperty("profiler.builder.parallelism", "1");
setProperty("profiler.hbase.writer.parallelism", "1");
setProperty("profiler.kafka.writer.parallelism", "1");
setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]");
setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60");
setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000");
// ensure tuples are serialized during the test, otherwise serialization problems
// will not be found until the topology is run on a cluster with multiple workers
setProperty(Config.TOPOLOGY_KRYO_REGISTER, kryoSerializers);
// kafka settings
setProperty("profiler.input.topic", inputTopic);
setProperty("profiler.output.topic", outputTopic);
setProperty("kafka.start", "EARLIEST");
setProperty("", "PLAINTEXT");
// hbase settings
setProperty("profiler.hbase.salt.divisor", Integer.toString(saltDivisor));
setProperty("profiler.hbase.table", tableName);
setProperty("", columnFamily);
setProperty("profiler.hbase.batch", "10");
setProperty("profiler.hbase.flush.interval.seconds", "1");
setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName());
// profile settings
setProperty("profiler.period.duration", Long.toString(periodDurationMillis));
setProperty("profiler.period.duration.units", "MILLISECONDS");
setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis));
setProperty("profiler.ttl.units", "MILLISECONDS");
setProperty("profiler.window.duration", Long.toString(windowDurationMillis));
setProperty("profiler.window.duration.units", "MILLISECONDS");
setProperty("profiler.window.lag", Long.toString(windowLagMillis));
setProperty("profiler.window.lag.units", "MILLISECONDS");
setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt));
// create the mock table
profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
zkComponent = getZKServerComponent(topologyProperties);
// create the input and output topics
kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList(
new KafkaComponent.Topic(inputTopic, 1),
new KafkaComponent.Topic(outputTopic, 1)));
// upload profiler configuration to zookeeper
configUploadComponent = new ConfigUploadComponent()
// load flux definition for the profiler topology
fluxComponent = new FluxTopologyComponent.Builder()
.withTopologyLocation(new File(FLUX_PATH))
// start all components
runner = new ComponentRunner.Builder()
.withComponent("kafka", kafkaComponent)
.withComponent("config", configUploadComponent)
.withComponent("storm", fluxComponent)
.withCustomShutdownOrder(new String[] {"storm","config","kafka","zk"})
public static void tearDownAfterClass() {
if (runner != null) {
public void setup() {
// create the mock table
profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
// global properties
Map<String, Object> global = new HashMap<String, Object>() {{
put(PROFILER_HBASE_TABLE.getKey(), tableName);
put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
// client needs to use the same period duration
put(PROFILER_PERIOD.getKey(), Long.toString(periodDurationMillis));
// client needs to use the same salt divisor
put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor);
// create the stellar execution environment
executor = new DefaultStellarStatefulExecutor(
new SimpleFunctionResolver()
new Context.Builder()
.with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
public void tearDown() {
if (runner != null) {
* Uploads config values to Zookeeper.
* @param profilerConfig The Profiler configuration.
* @throws Exception
public void uploadConfigToZookeeper(ProfilerConfig profilerConfig) throws Exception {
* Assign a value to the result of an expression.
* @param var The variable to assign.
* @param expression The expression to execute.
private void assign(String var, String expression) {
executor.assign(var, expression, Collections.emptyMap());
* Execute a Stellar expression.
* @param expression The Stellar expression to execute.
* @param clazz
* @param <T>
* @return The result of executing the Stellar expression.
private <T> T execute(String expression, Class<T> clazz) {
T results = executor.execute(expression, Collections.emptyMap(), clazz);
LOG.debug("{} = {}", expression, results);
return results;