blob: cb6c0c4d86942d2f3749b1e83cc91113368fecb8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsers;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.common.Constants.Fields;
import org.apache.metron.common.configuration.FieldTransformer;
import org.apache.metron.common.configuration.FieldValidator;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.error.MetronError;
import org.apache.metron.common.message.metadata.RawMessage;
import org.apache.metron.common.utils.ReflectionUtils;
import org.apache.metron.parsers.filters.Filters;
import org.apache.metron.parsers.interfaces.MessageFilter;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.interfaces.MessageParserResult;
import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The default implemention of a ParserRunner.
*/
public class ParserRunnerImpl implements ParserRunner<JSONObject>, Serializable {
class ProcessResult {
private JSONObject message;
private MetronError error;
public ProcessResult(JSONObject message) {
this.message = message;
}
public ProcessResult(MetronError error) {
this.error = error;
}
public JSONObject getMessage() {
return message;
}
public MetronError getError() {
return error;
}
public boolean isError() {
return error != null;
}
}
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected transient Consumer<ParserRunnerResults> onSuccess;
protected transient Consumer<MetronError> onError;
private HashSet<String> sensorTypes;
private Map<String, ParserComponent> sensorToParserComponentMap;
// Stellar variables
private transient Context stellarContext;
public ParserRunnerImpl(HashSet<String> sensorTypes) {
this.sensorTypes = sensorTypes;
}
public Map<String, ParserComponent> getSensorToParserComponentMap() {
return sensorToParserComponentMap;
}
public void setSensorToParserComponentMap(Map<String, ParserComponent> sensorToParserComponentMap) {
this.sensorToParserComponentMap = sensorToParserComponentMap;
}
public Context getStellarContext() {
return stellarContext;
}
@Override
public Set<String> getSensorTypes() {
return sensorTypes;
}
@Override
public void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext) {
if (parserConfigSupplier == null) {
throw new IllegalStateException("A parser config supplier must be set before initializing the ParserRunner.");
}
if (stellarContext == null) {
throw new IllegalStateException("A stellar context must be set before initializing the ParserRunner.");
}
this.stellarContext = stellarContext;
initializeParsers(parserConfigSupplier);
}
/**
* Parses messages with the appropriate MessageParser based on sensor type. The resulting list of messages are then
* post-processed and added to the ParserRunnerResults message list. Any errors that happen during post-processing are
* added to the ParserRunnerResults error list. Any exceptions (including a master exception) thrown by the MessageParser
* are also added to the ParserRunnerResults error list.
*
* @param sensorType Sensor type of the message
* @param rawMessage Raw message including metadata
* @param parserConfigurations Parser configurations
* @return ParserRunnerResults containing a list of messages and a list of errors
*/
@Override
public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
if (sensorParserConfig != null) {
MessageParser<JSONObject> parser = sensorToParserComponentMap.get(sensorType).getMessageParser();
Optional<MessageParserResult<JSONObject>> optionalMessageParserResult = parser.parseOptionalResult(rawMessage.getMessage());
if (optionalMessageParserResult.isPresent()) {
MessageParserResult<JSONObject> messageParserResult = optionalMessageParserResult.get();
// Process each message returned from the MessageParser
messageParserResult.getMessages().forEach(message -> {
Optional<ProcessResult> processResult = processMessage(sensorType, message, rawMessage, parser, parserConfigurations);
if (processResult.isPresent()) {
if (processResult.get().isError()) {
parserRunnerResults.addError(processResult.get().getError());
} else {
parserRunnerResults.addMessage(processResult.get().getMessage());
}
}
});
// If a master exception is thrown by the MessageParser, wrap it with a MetronError and add it to the list of errors
messageParserResult.getMasterThrowable().ifPresent(throwable -> parserRunnerResults.addError(new MetronError()
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(throwable)
.withSensorType(Collections.singleton(sensorType))
.withMetadata(rawMessage.getMetadata())
.addRawMessage(rawMessage.getMessage())));
// If exceptions are thrown by the MessageParser, wrap them with MetronErrors and add them to the list of errors
parserRunnerResults.addErrors(messageParserResult.getMessageThrowables().entrySet().stream().map(entry -> new MetronError()
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(entry.getValue())
.withSensorType(Collections.singleton(sensorType))
.withMetadata(rawMessage.getMetadata())
.addRawMessage(entry.getKey())).collect(Collectors.toList()));
}
} else {
throw new IllegalStateException(String.format("Could not execute parser. Cannot find configuration for sensor %s.",
sensorType));
}
return parserRunnerResults;
}
/**
* Initializes MessageParsers and MessageFilters for sensor types configured in this ParserRunner. Objects are created
* using reflection and the MessageParser configure and init methods are called.
* @param parserConfigSupplier Parser configurations
*/
private void initializeParsers(Supplier<ParserConfigurations> parserConfigSupplier) {
LOG.info("Initializing parsers...");
sensorToParserComponentMap = new HashMap<>();
for(String sensorType: sensorTypes) {
if (parserConfigSupplier.get().getSensorParserConfig(sensorType) == null) {
throw new IllegalStateException(String.format("Could not initialize parsers. Cannot find configuration for sensor %s.",
sensorType));
}
SensorParserConfig parserConfig = parserConfigSupplier.get().getSensorParserConfig(sensorType);
LOG.info("Creating parser for sensor {} with parser class = {} and filter class = {} ",
sensorType, parserConfig.getParserClassName(), parserConfig.getFilterClassName());
// create message parser
MessageParser<JSONObject> parser = ReflectionUtils
.createInstance(parserConfig.getParserClassName());
// create message filter
MessageFilter<JSONObject> filter = null;
parserConfig.getParserConfig().putIfAbsent("stellarContext", stellarContext);
if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
filter = Filters.get(
parserConfig.getFilterClassName(),
parserConfig.getParserConfig()
);
}
parser.configure(parserConfig.getParserConfig());
parser.init();
sensorToParserComponentMap.put(sensorType, new ParserComponent(parser, filter));
}
}
/**
* Post-processes parsed messages by:
* <ul>
* <li>Applying field transformations defined in the sensor parser config</li>
* <li>Filtering messages using the configured MessageFilter class</li>
* <li>Validating messages using the MessageParser validate method</li>
* </ul>
* If a message is successfully processed a message is returned in a ProcessResult. If a message fails
* validation, a MetronError object is created and returned in a ProcessResult. If a message is
* filtered out an empty Optional is returned.
*
* @param sensorType Sensor type of the message
* @param message Message parsed by the MessageParser
* @param rawMessage Raw message including metadata
* @param parser MessageParser for the sensor type
* @param parserConfigurations Parser configurations
*/
@SuppressWarnings("unchecked")
protected Optional<ProcessResult> processMessage(String sensorType, JSONObject message, RawMessage rawMessage,
MessageParser<JSONObject> parser,
ParserConfigurations parserConfigurations
) {
Optional<ProcessResult> processResult = Optional.empty();
SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType);
sensorParserConfig.getRawMessageStrategy().mergeMetadata(
message,
rawMessage.getMetadata(),
sensorParserConfig.getMergeMetadata(),
sensorParserConfig.getRawMessageStrategyConfig()
);
message.put(Constants.SENSOR_TYPE, sensorType);
applyFieldTransformations(message, rawMessage, sensorParserConfig);
if (!message.containsKey(Constants.GUID)) {
message.put(Constants.GUID, UUID.randomUUID().toString());
}
message.putIfAbsent(Fields.ORIGINAL.getName(), new String(rawMessage.getMessage()));
MessageFilter<JSONObject> filter = sensorToParserComponentMap.get(sensorType).getFilter();
if (filter == null || filter.emit(message, stellarContext)) {
boolean isInvalid = !parser.validate(message);
List<FieldValidator> failedValidators = null;
if (!isInvalid) {
failedValidators = getFailedValidators(message, parserConfigurations);
isInvalid = !failedValidators.isEmpty();
}
if (isInvalid) {
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_INVALID)
.withSensorType(Collections.singleton(sensorType))
.withMetadata(rawMessage.getMetadata())
.addRawMessage(message);
Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
.flatMap(fieldValidator -> fieldValidator.getInput().stream())
.collect(Collectors.toSet());
if (errorFields != null && !errorFields.isEmpty()) {
error.withErrorFields(errorFields);
}
processResult = Optional.of(new ProcessResult(error));
} else {
processResult = Optional.of(new ProcessResult(message));
}
}
return processResult;
}
/**
* Applies Stellar field transformations defined in the sensor parser config.
* @param message Message parsed by the MessageParser
* @param rawMessage Raw message including metadata
* @param sensorParserConfig Sensor parser config
*/
private void applyFieldTransformations(JSONObject message, RawMessage rawMessage, SensorParserConfig sensorParserConfig) {
for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
if (handler != null) {
if (!sensorParserConfig.getMergeMetadata()) {
//if we haven't merged metadata, then we need to pass them along as configuration params.
handler.transformAndUpdate(
message,
stellarContext,
sensorParserConfig.getParserConfig(),
rawMessage.getMetadata()
);
} else {
handler.transformAndUpdate(
message,
stellarContext,
sensorParserConfig.getParserConfig()
);
}
}
}
}
private List<FieldValidator> getFailedValidators(JSONObject message, ParserConfigurations parserConfigurations) {
List<FieldValidator> fieldValidations = parserConfigurations.getFieldValidations();
List<FieldValidator> failedValidators = new ArrayList<>();
for(FieldValidator validator : fieldValidations) {
if(!validator.isValid(message, parserConfigurations.getGlobalConfig(), stellarContext)) {
failedValidators.add(validator);
}
}
return failedValidators;
}
}