blob: 6a476d443ee3a05eb81c4daadf6d1a44cc85d414 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.management;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.metron.common.Constants.ErrorFields.*;
import static org.apache.metron.common.Constants.Fields.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests the {@link ParserFunctions} class.
*/
public class ParserFunctionsTest {
static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
FunctionResolver functionResolver;
Map<String, Object> variables;
Context context = null;
StellarStatefulExecutor executor;
@BeforeEach
public void setup() {
variables = new HashMap<>();
functionResolver = new SimpleFunctionResolver()
.withClass(ParserFunctions.ParseFunction.class)
.withClass(ParserFunctions.InitializeFunction.class)
.withClass(ParserFunctions.ConfigFunction.class);
context = new Context.Builder().build();
executor = new DefaultStellarStatefulExecutor(functionResolver, context);
}
/**
* {
* "dns": {
* "ts":1402308259.609,
* "uid":"CuJT272SKaJSuqO0Ia",
* "id.orig_h":"10.122.196.204",
* "id.orig_p":33976,
* "id.resp_h":"144.254.71.184",
* "id.resp_p":53,
* "proto":"udp",
* "trans_id":62418,
* "query":"www.cisco.com",
* "qclass":1,
* "qclass_name":"C_INTERNET",
* "qtype":28,
* "qtype_name":"AAAA",
* "rcode":0,
* "rcode_name":"NOERROR",
* "AA":true,
* "TC":false,
* "RD":true,
* "RA":true,
* "Z":0,
* "answers":["www.cisco.com.akadns.net","origin-www.cisco.com","2001:420:1201:2::a"],
* "TTLs":[3600.0,289.0,14.0],
* "rejected":false
* }
* }
*/
@Multiline
public String broMessage;
/**
* {
* "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
* "filterClassName":"org.apache.metron.parsers.filters.StellarFilter",
* "sensorTopic":"bro"
* }
*/
@Multiline
private String broParserConfig;
@Test
public void testParseBroMessage() {
// initialize the parser with the sensor config
set("config", broParserConfig);
assign("parser", "PARSER_INIT('bro', config)");
// parse the message
set("message", broMessage);
List<JSONObject> messages = execute("PARSER_PARSE(parser, message)", List.class);
// validate the parsed message
assertEquals(1, messages.size());
JSONObject message = messages.get(0);
assertEquals("bro", message.get(Constants.SENSOR_TYPE));
assertEquals("10.122.196.204", message.get(SRC_ADDR.getName()));
assertEquals(33976L, message.get(SRC_PORT.getName()));
assertEquals("144.254.71.184", message.get(DST_ADDR.getName()));
assertEquals(53L, message.get(DST_PORT.getName()));
assertEquals("dns", message.get("protocol"));
}
@Test
public void testParseMultipleMessages() {
// initialize the parser with the sensor config
set("config", broParserConfig);
assign("parser", "PARSER_INIT('bro', config)");
// parse the message
set("msg1", broMessage);
set("msg2", broMessage);
set("msg3", broMessage);
List<JSONObject> messages = execute("PARSER_PARSE(parser, [msg1, msg2, msg3])", List.class);
// expect a parsed message
assertEquals(3, messages.size());
for(JSONObject message: messages) {
assertEquals("bro", message.get(Constants.SENSOR_TYPE));
assertTrue(message.containsKey(Constants.GUID));
assertEquals("10.122.196.204", message.get(SRC_ADDR.getName()));
assertEquals(33976L, message.get(SRC_PORT.getName()));
assertEquals("144.254.71.184", message.get(DST_ADDR.getName()));
assertEquals(53L, message.get(DST_PORT.getName()));
assertEquals("dns", message.get("protocol"));
}
}
@Test
public void testParseInvalidMessage() {
// initialize the parser with the sensor config
set("config", broParserConfig);
assign("parser", "PARSER_INIT('bro', config)");
// parse the message
String invalidMessage = "{ this is an invalid message }}";
set("message", invalidMessage);
List<JSONObject> messages = execute("PARSER_PARSE(parser, message)", List.class);
// validate the parsed message
assertEquals(1, messages.size());
// expect an error message to be returned
JSONObject error = messages.get(0);
assertEquals(invalidMessage, error.get("raw_message"));
assertEquals(Constants.ERROR_TYPE, error.get(Constants.SENSOR_TYPE));
assertEquals("parser_error", error.get(ERROR_TYPE.getName()));
assertTrue(error.containsKey(MESSAGE.getName()));
assertTrue(error.containsKey(EXCEPTION.getName()));
assertTrue(error.containsKey(STACK.getName()));
assertTrue(error.containsKey(ERROR_HASH.getName()));
assertTrue(error.containsKey(Constants.GUID));
}
@Test
public void testParseSomeGoodSomeBadMessages() {
// initialize the parser with the sensor config
set("config", broParserConfig);
assign("parser", "PARSER_INIT('bro', config)");
// parse the message
String invalidMessage = "{ this is an invalid message }}";
set("msg1", broMessage);
set("msg2", invalidMessage);
List<JSONObject> messages = execute("PARSER_PARSE(parser, [msg1, msg2])", List.class);
// expect 2 messages to be returned - 1 success and 1 error
assertEquals(2, messages.size());
assertEquals(1, messages.stream().filter(msg -> isBro(msg)).count());
assertEquals(1, messages.stream().filter(msg -> isError(msg)).count());
}
@Test
public void testConfig() throws Exception {
// initialize the parser
set("config", broParserConfig);
assign("parser", "PARSER_INIT('bro', config)");
String config = execute("PARSER_CONFIG(parser)", String.class);
assertNotNull(config);
assertNotNull(SensorParserConfig.fromBytes(config.getBytes(StandardCharsets.UTF_8)));
}
@Test
public void testInitFromString() throws Exception {
set("configAsString", broParserConfig);
StellarParserRunner runner = execute("PARSER_INIT('bro', configAsString)", StellarParserRunner.class);
assertNotNull(runner);
SensorParserConfig actual = runner.getParserConfigurations().getSensorParserConfig("bro");
SensorParserConfig expected = SensorParserConfig.fromBytes(broParserConfig.getBytes(
StandardCharsets.UTF_8));
assertEquals(expected, actual);
}
@Test
public void testInitFromMap() throws Exception {
Map<String, Object> configAsMap = (JSONObject) new JSONParser().parse(broParserConfig);
set("configAsMap", configAsMap);
StellarParserRunner runner = execute("PARSER_INIT('bro', configAsMap)", StellarParserRunner.class);
assertNotNull(runner);
SensorParserConfig actual = runner.getParserConfigurations().getSensorParserConfig("bro");
SensorParserConfig expected = SensorParserConfig.fromBytes(broParserConfig.getBytes(
StandardCharsets.UTF_8));
assertEquals(expected, actual);
}
@Test
public void testInitFromInvalidValue() {
assertThrows(ParseException.class, () -> execute("PARSER_INIT('bro', 22)", StellarParserRunner.class));
}
@Test
public void testInitFromZookeeper() throws Exception {
byte[] configAsBytes = broParserConfig.getBytes(StandardCharsets.UTF_8);
CuratorFramework zkClient = zkClientForPath("/metron/topology/parsers/bro", configAsBytes);
context.addCapability(Context.Capabilities.ZOOKEEPER_CLIENT, () -> zkClient);
StellarParserRunner runner = execute("PARSER_INIT('bro')", StellarParserRunner.class);
assertNotNull(runner);
SensorParserConfig actual = runner.getParserConfigurations().getSensorParserConfig("bro");
SensorParserConfig expected = SensorParserConfig.fromBytes(broParserConfig.getBytes(
StandardCharsets.UTF_8));
assertEquals(expected, actual);
}
@Test
public void testInitMissingFromZookeeper() throws Exception {
// there is no config for 'bro' in zookeeper
CuratorFramework zkClient = zkClientMissingPath("/metron/topology/parsers/bro");
context.addCapability(Context.Capabilities.ZOOKEEPER_CLIENT, () -> zkClient);
assertThrows(ParseException.class, () -> execute("PARSER_INIT('bro')", StellarParserRunner.class));
}
/**
* Create a mock Zookeeper client that returns a value for a given path.
*
* @param path The path within Zookeeper that will be requested.
* @param value The value to return when the path is requested.
* @return The mock Zookeeper client.
*/
private CuratorFramework zkClientForPath(String path, byte[] value) throws Exception {
GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
when(getDataBuilder.forPath(path)).thenReturn(value);
CuratorFramework zkClient = mock(CuratorFramework.class);
when(zkClient.getData()).thenReturn(getDataBuilder);
return zkClient;
}
/**
* Create a mock Zookeeper client that will indicate the given path does not exist.
*
* @param path The path that will 'not exist'.
* @return The mock Zookeeper client.
*/
private CuratorFramework zkClientMissingPath(String path) throws Exception {
GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
when(getDataBuilder.forPath(path)).thenThrow(new KeeperException.NoNodeException(path));
CuratorFramework zkClient = mock(CuratorFramework.class);
when(zkClient.getData()).thenReturn(getDataBuilder);
return zkClient;
}
private boolean isError(JSONObject message) {
String sensorType = String.class.cast(message.get(Constants.SENSOR_TYPE));
return Constants.ERROR_TYPE.equals(sensorType);
}
private boolean isBro(JSONObject message) {
String sensorType = String.class.cast(message.get(Constants.SENSOR_TYPE));
return "bro".equals(sensorType);
}
/**
* Set the value of a variable.
*
* @param var The variable to assign.
* @param value The value to assign.
*/
private void set(String var, Object value) {
executor.assign(var, value);
}
/**
* Assign a value to the result of an expression.
*
* @param var The variable to assign.
* @param expression The expression to execute.
*/
private Object assign(String var, String expression) {
executor.assign(var, expression, Collections.emptyMap());
return executor.getState().get(var);
}
/**
* Execute a Stellar expression.
*
* @param expression The Stellar expression to execute.
* @param clazz
* @param <T>
* @return The result of executing the Stellar expression.
*/
private <T> T execute(String expression, Class<T> clazz) {
T results = executor.execute(expression, Collections.emptyMap(), clazz);
LOG.debug("{} = {}", expression, results);
return results;
}
}