blob: 46a49fcb139e3884d53d79ef458669c3830c8851 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsers.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredParserBolt;
import org.apache.metron.common.configuration.FieldValidator;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.interfaces.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterComponent;
import org.apache.metron.common.writer.WriterToBulkWriter;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.parsers.filters.Filters;
import org.apache.metron.common.configuration.FieldTransformer;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.parsers.filters.GenericMessageFilter;
import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.parsers.interfaces.MessageFilter;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.common.interfaces.MessageWriter;
import org.json.simple.JSONObject;
import java.io.Serializable;
import java.util.*;
import java.util.function.Function;
public class ParserBolt extends ConfiguredParserBolt implements Serializable {
private OutputCollector collector;
private MessageParser<JSONObject> parser;
private MessageFilter<JSONObject> filter = new GenericMessageFilter();
private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer;
private BulkMessageWriter<JSONObject> messageWriter;
private BulkWriterComponent<JSONObject> writerComponent;
private boolean isBulk = false;
public ParserBolt( String zookeeperUrl
, String sensorType
, MessageParser<JSONObject> parser
, MessageWriter<JSONObject> writer
)
{
super(zookeeperUrl, sensorType);
isBulk = false;
this.parser = parser;
messageWriter = new WriterToBulkWriter<>(writer);
}
public ParserBolt( String zookeeperUrl
, String sensorType
, MessageParser<JSONObject> parser
, BulkMessageWriter<JSONObject> writer
)
{
super(zookeeperUrl, sensorType);
isBulk = true;
this.parser = parser;
messageWriter = writer;
}
public ParserBolt withMessageFilter(MessageFilter<JSONObject> filter) {
this.filter = filter;
return this;
}
@SuppressWarnings("unchecked")
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
this.collector = collector;
if(getSensorParserConfig() == null) {
filter = new GenericMessageFilter();
}
else if(filter == null) {
filter = Filters.get(getSensorParserConfig().getFilterClassName()
, getSensorParserConfig().getParserConfig()
);
}
parser.init();
if(isBulk) {
writerTransformer = config -> new ParserWriterConfiguration(config);
}
else {
writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config));
}
try {
messageWriter.init(stormConf, writerTransformer.apply(getConfigurations()));
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize message writer", e);
}
this.writerComponent = new BulkWriterComponent<JSONObject>(collector, isBulk, isBulk) {
@Override
protected Collection<Tuple> createTupleCollection() {
return new HashSet<>();
}
};
SensorParserConfig config = getSensorParserConfig();
if(config != null) {
config.init();
}
else {
throw new IllegalStateException("Unable to retrieve a parser config for " + getSensorType());
}
parser.configure(config.getParserConfig());
}
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
byte[] originalMessage = tuple.getBinary(0);
SensorParserConfig sensorParserConfig = getSensorParserConfig();
try {
//we want to ack the tuple in the situation where we have are not doing a bulk write
//otherwise we want to defer to the writerComponent who will ack on bulk commit.
boolean ackTuple = !isBulk;
int numWritten = 0;
if(sensorParserConfig != null) {
List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
Optional<List<JSONObject>> messages = parser.parseOptional(originalMessage);
for (JSONObject message : messages.orElse(Collections.emptyList())) {
if (parser.validate(message) && filter != null && filter.emitTuple(message)) {
message.put(Constants.SENSOR_TYPE, getSensorType());
for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
if (handler != null) {
handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
}
}
if(!isGloballyValid(message, fieldValidations)) {
message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid");
collector.emit(Constants.INVALID_STREAM, new Values(message));
}
else {
numWritten++;
writerComponent.write(getSensorType(), tuple, message, messageWriter, writerTransformer.apply(getConfigurations()));
}
}
}
}
//if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
//(meaning that none of the messages are valid either globally or locally)
//then we want to handle the ack ourselves.
if(ackTuple || numWritten == 0) {
collector.ack(tuple);
}
} catch (Throwable ex) {
ErrorUtils.handleError(collector, ex, Constants.ERROR_STREAM);
collector.ack(tuple);
}
}
private boolean isGloballyValid(JSONObject input, List<FieldValidator> validators) {
for(FieldValidator validator : validators) {
if(!validator.isValid(input, getConfigurations().getGlobalConfig())) {
return false;
}
}
return true;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(Constants.INVALID_STREAM, new Fields("message"));
declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
}
}