blob: a2fabfad141051bfcbdd9b9307263001a3fe8d7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler.repl;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
import static org.junit.jupiter.api.Assertions.*;
/**
* Tests the ProfilerFunctions class.
*/
public class ProfilerFunctionsTest {
/**
* {
* "ip_src_addr": "10.0.0.1",
* "ip_dst_addr": "10.0.0.2",
* "source.type": "test",
* }
*/
@Multiline
private String message;
/**
* [
* {
* "ip_src_addr": "10.0.0.1",
* "ip_dst_addr": "10.0.0.2",
* "source.type": "test",
* },
* {
* "ip_src_addr": "10.0.0.1",
* "ip_dst_addr": "10.0.0.2",
* "source.type": "test",
* },
* {
* "ip_src_addr": "10.0.0.1",
* "ip_dst_addr": "10.0.0.2",
* "source.type": "test",
* }
* ]
*/
@Multiline
private String messages;
/**
* {
* "profiles": [
* {
* "profile": "hello-world",
* "foreach": "ip_src_addr",
* "init": { "count": 0 },
* "update": { "count": "count + 1" },
* "result": "count"
* }
* ]
* }
*/
@Multiline
private String helloWorldProfilerDef;
private static final long periodDuration = 15;
private static final String periodUnits = "MINUTES";
private StellarStatefulExecutor executor;
private Map<String, Object> state;
private <T> T run(String expression, Class<T> clazz) {
return executor.execute(expression, state, clazz);
}
@BeforeEach
public void setup() {
state = new HashMap<>();
// global properties
Map<String, Object> global = new HashMap<String, Object>() {{
put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
}};
// create the stellar execution environment
executor = new DefaultStellarStatefulExecutor(
new SimpleFunctionResolver()
.withClass(ProfilerFunctions.ProfilerInit.class)
.withClass(ProfilerFunctions.ProfilerApply.class)
.withClass(ProfilerFunctions.ProfilerFlush.class),
new Context.Builder()
.with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
.build());
}
@Test
public void testProfilerInitNoProfiles() {
state.put("config", "{ \"profiles\" : [] }");
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
assertNotNull(profiler);
assertEquals(0, profiler.getProfileCount());
assertEquals(0, profiler.getMessageCount());
assertEquals(0, profiler.getRouteCount());
}
@Test
public void testProfilerInitWithProfiles() {
state.put("config", helloWorldProfilerDef);
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
assertNotNull(profiler);
assertEquals(1, profiler.getProfileCount());
assertEquals(0, profiler.getMessageCount());
assertEquals(0, profiler.getRouteCount());
}
@Test
public void testProfilerInitNoArgs() {
assertThrows(ParseException.class, () -> run("PROFILER_INIT()", StandAloneProfiler.class));
}
@Test
public void testProfilerInitInvalidArg() {
assertThrows(
ParseException.class,
() -> run("PROFILER_INIT({ \"invalid\": 2 })", StandAloneProfiler.class));
}
@Test
public void testProfilerInitWithNoGlobalConfig() {
state.put("config", helloWorldProfilerDef);
String expression = "PROFILER_INIT(config)";
// use an executor with no GLOBAL_CONFIG defined in the context
StellarStatefulExecutor executor = new DefaultStellarStatefulExecutor(
new SimpleFunctionResolver()
.withClass(ProfilerFunctions.ProfilerInit.class)
.withClass(ProfilerFunctions.ProfilerApply.class)
.withClass(ProfilerFunctions.ProfilerFlush.class),
Context.EMPTY_CONTEXT());
StandAloneProfiler profiler = executor.execute(expression, state, StandAloneProfiler.class);
assertNotNull(profiler);
assertEquals(1, profiler.getProfileCount());
assertEquals(0, profiler.getMessageCount());
assertEquals(0, profiler.getRouteCount());
}
@Test
public void testProfilerApplyWithString() {
// initialize the profiler
state.put("config", helloWorldProfilerDef);
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
state.put("profiler", profiler);
// apply a message to the profiler
state.put("message", message);
StandAloneProfiler result = run("PROFILER_APPLY(message, profiler)", StandAloneProfiler.class);
// validate
assertSame(profiler, result);
assertEquals(1, profiler.getProfileCount());
assertEquals(1, profiler.getMessageCount());
assertEquals(1, profiler.getRouteCount());
}
@Test
public void testProfilerApplyWithJSONObject() throws Exception {
// initialize the profiler
state.put("config", helloWorldProfilerDef);
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
state.put("profiler", profiler);
// apply a message to the profiler
JSONParser parser = new JSONParser();
JSONObject jsonObject = (JSONObject) parser.parse(message);
state.put("jsonObj", jsonObject);
StandAloneProfiler result = run("PROFILER_APPLY(jsonObj, profiler)", StandAloneProfiler.class);
// validate
assertSame(profiler, result);
assertEquals(1, profiler.getProfileCount());
assertEquals(1, profiler.getMessageCount());
assertEquals(1, profiler.getRouteCount());
}
@Test
public void testProfilerApplyWithMultipleMessagesInJSONString() {
// initialize the profiler
state.put("config", helloWorldProfilerDef);
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
state.put("profiler", profiler);
// apply a message to the profiler
state.put("messages", messages);
StandAloneProfiler result = run("PROFILER_APPLY(messages, profiler)", StandAloneProfiler.class);
// validate
assertSame(profiler, result);
assertEquals(1, profiler.getProfileCount());
assertEquals(3, profiler.getMessageCount());
assertEquals(3, profiler.getRouteCount());
}
@Test
public void testProfilerApplyWithListOfMessages() {
// initialize the profiler
state.put("config", helloWorldProfilerDef);
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
state.put("profiler", profiler);
// apply a message to the profiler
state.put("msg", message);
StandAloneProfiler result = run("PROFILER_APPLY([msg, msg, msg], profiler)", StandAloneProfiler.class);
// validate
assertSame(profiler, result);
assertEquals(1, profiler.getProfileCount());
assertEquals(3, profiler.getMessageCount());
assertEquals(3, profiler.getRouteCount());
}
@Test
public void testProfilerApplyWithEmptyList() {
// initialize the profiler
state.put("config", helloWorldProfilerDef);
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
state.put("profiler", profiler);
// apply a message to the profiler
state.put("messages", "[ ]");
StandAloneProfiler result = run("PROFILER_APPLY(messages, profiler)", StandAloneProfiler.class);
// validate
assertSame(profiler, result);
assertEquals(1, profiler.getProfileCount());
assertEquals(0, profiler.getMessageCount());
assertEquals(0, profiler.getRouteCount());
}
@Test
public void testProfilerApplyWithNoArgs() {
assertThrows(ParseException.class, () -> run("PROFILER_APPLY()", StandAloneProfiler.class));
}
@Test
public void testProfilerApplyWithInvalidArg() {
assertThrows(ParseException.class, () -> run("PROFILER_APPLY(undefined)", StandAloneProfiler.class));
}
@Test
public void testProfilerApplyWithNullMessage() {
// initialize the profiler
state.put("config", helloWorldProfilerDef);
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
state.put("profiler", profiler);
// there is no 'messages' variable - should throw exception
assertThrows(ParseException.class, () -> run("PROFILER_APPLY(messages, profiler)", StandAloneProfiler.class));
}
@Test
public void testProfilerFlush() {
// initialize the profiler
state.put("config", helloWorldProfilerDef);
StandAloneProfiler profiler = run("PROFILER_INIT(config)", StandAloneProfiler.class);
state.put("profiler", profiler);
// apply a message to the profiler
state.put("message", message);
run("PROFILER_APPLY(message, profiler)", StandAloneProfiler.class);
// flush the profiles
List<Map<String, Object>> measurements = run("PROFILER_FLUSH(profiler)", List.class);
// validate
assertNotNull(measurements);
assertEquals(1, measurements.size());
Map<String, Object> measurement = measurements.get(0);
assertEquals("hello-world", measurement.get("profile"));
assertEquals("10.0.0.1", measurement.get("entity"));
assertEquals(1, measurement.get("value"));
assertEquals(Collections.emptyList(), measurement.get("groups"));
}
@Test
public void testProfilerFlushNoArgs() {
assertThrows(ParseException.class, () -> run("PROFILER_FLUSH()", StandAloneProfiler.class));
}
@Test
public void testProfilerFlushInvalidArg() {
assertThrows(ParseException.class, () -> run("PROFILER_FLUSH(undefined)", StandAloneProfiler.class));
}
}