blob: d20e1a5cfc0b2b312cdc11c2e0b9cb1ff5a2f5b5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsers.topology;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
import org.apache.metron.common.utils.KafkaUtils;
import org.apache.metron.common.utils.ReflectionUtils;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.MessageWriter;
import org.apache.metron.parsers.bolt.ParserBolt;
import org.apache.metron.parsers.bolt.WriterBolt;
import org.apache.metron.parsers.bolt.WriterHandler;
import org.apache.metron.parsers.filters.Filters;
import org.apache.metron.parsers.interfaces.MessageFilter;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.topology.config.ValueSupplier;
import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
import org.apache.metron.writer.AbstractWriter;
import org.apache.metron.writer.kafka.KafkaWriter;
import org.apache.storm.Config;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.json.simple.JSONObject;
/**
* Builds a Storm topology that parses telemetry data received from a sensor.
*/
public class ParserTopologyBuilder {
public static class ParserTopology {
private TopologyBuilder builder;
private Config topologyConfig;
private ParserTopology(TopologyBuilder builder, Config topologyConfig) {
this.builder = builder;
this.topologyConfig = topologyConfig;
}
public TopologyBuilder getBuilder() {
return builder;
}
public Config getTopologyConfig() {
return topologyConfig;
}
}
/**
* Builds a Storm topology that parses telemetry data received from an external sensor.
*
* @param zookeeperUrl Zookeeper URL
* @param brokerUrl Kafka Broker URL
* @param sensorTypes Type of sensor
* @param spoutParallelismSupplier Supplier for the parallelism hint for the spout
* @param spoutNumTasksSupplier Supplier for the number of tasks for the spout
* @param parserParallelismSupplier Supplier for the parallelism hint for the parser bolt
* @param parserNumTasksSupplier Supplier for the number of tasks for the parser bolt
* @param errorWriterParallelismSupplier Supplier for the parallelism hint for the bolt that handles errors
* @param errorWriterNumTasksSupplier Supplier for the number of tasks for the bolt that handles errors
* @param kafkaSpoutConfigSupplier Supplier for the configuration options for the kafka spout
* @param securityProtocolSupplier Supplier for the security protocol
* @param outputTopicSupplier Supplier for the output kafka topic
* @param stormConfigSupplier Supplier for the storm config
* @return A Storm topology that parses telemetry data received from an external sensor
* @throws Exception
*/
public static ParserTopology build(String zookeeperUrl,
Optional<String> brokerUrl,
List<String> sensorTypes,
ValueSupplier<List> spoutParallelismSupplier,
ValueSupplier<List> spoutNumTasksSupplier,
ValueSupplier<Integer> parserParallelismSupplier,
ValueSupplier<Integer> parserNumTasksSupplier,
ValueSupplier<Integer> errorWriterParallelismSupplier,
ValueSupplier<Integer> errorWriterNumTasksSupplier,
ValueSupplier<List> kafkaSpoutConfigSupplier,
ValueSupplier<String> securityProtocolSupplier,
ValueSupplier<String> outputTopicSupplier,
ValueSupplier<String> errorTopicSupplier,
ValueSupplier<Config> stormConfigSupplier
) throws Exception {
// fetch configuration from zookeeper
ParserConfigurations configs = new ParserConfigurations();
Map<String, SensorParserConfig> sensorToParserConfigs = getSensorParserConfig(zookeeperUrl, sensorTypes, configs);
Collection<SensorParserConfig> parserConfigs = sensorToParserConfigs.values();
@SuppressWarnings("unchecked")
List<Integer> spoutParallelism = (List<Integer>) spoutParallelismSupplier.get(parserConfigs, List.class);
@SuppressWarnings("unchecked")
List<Integer> spoutNumTasks = (List<Integer>) spoutNumTasksSupplier.get(parserConfigs, List.class);
int parserParallelism = parserParallelismSupplier.get(parserConfigs, Integer.class);
int parserNumTasks = parserNumTasksSupplier.get(parserConfigs, Integer.class);
int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfigs, Integer.class);
int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfigs, Integer.class);
String outputTopic = outputTopicSupplier.get(parserConfigs, String.class);
List<Map<String, Object>> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfigs, List.class);
Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfigs, String.class));
// create the spout
TopologyBuilder builder = new TopologyBuilder();
int i = 0;
List<String> spoutIds = new ArrayList<>();
for (Entry<String, SensorParserConfig> entry: sensorToParserConfigs.entrySet()) {
KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, entry.getKey(), securityProtocol,
Optional.ofNullable(kafkaSpoutConfig.get(i)), entry.getValue());
String spoutId = sensorToParserConfigs.size() > 1 ? "kafkaSpout-" + entry.getKey() : "kafkaSpout";
builder.setSpout(spoutId, kafkaSpout, spoutParallelism.get(i))
.setNumTasks(spoutNumTasks.get(i));
spoutIds.add(spoutId);
++i;
}
// create the parser bolt
ParserBolt parserBolt = createParserBolt(
zookeeperUrl,
brokerUrl,
sensorToParserConfigs,
securityProtocol,
configs,
Optional.ofNullable(outputTopic)
);
BoltDeclarer boltDeclarer = builder
.setBolt("parserBolt", parserBolt, parserParallelism)
.setNumTasks(parserNumTasks);
for (String spoutId : spoutIds) {
boltDeclarer.localOrShuffleGrouping(spoutId);
}
// create the error bolt, if needed
if (errorWriterNumTasks > 0) {
String errorTopic = errorTopicSupplier.get(parserConfigs, String.class);
WriterBolt errorBolt = createErrorBolt(
zookeeperUrl,
brokerUrl,
sensorTypes.get(0),
securityProtocol,
configs,
parserConfigs.iterator().next(),
errorTopic
);
builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
.setNumTasks(errorWriterNumTasks)
.localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM);
}
return new ParserTopology(builder, stormConfigSupplier.get(parserConfigs, Config.class));
}
/**
* Create a spout that consumes tuples from a Kafka topic.
*
* @param zkQuorum Zookeeper URL
* @param sensorType Type of sensor
* @param kafkaConfigOptional Configuration options for the kafka spout
* @param parserConfig Configuration for the parser
* @return
*/
private static StormKafkaSpout<Object, Object> createKafkaSpout( String zkQuorum
, String sensorType
, Optional<String> securityProtocol
, Optional<Map<String, Object>> kafkaConfigOptional
, SensorParserConfig parserConfig
)
{
Map<String, Object> kafkaSpoutConfigOptions = kafkaConfigOptional.orElse(new HashMap<>());
String inputTopic = parserConfig.getSensorTopic() != null ? parserConfig.getSensorTopic() : sensorType;
kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key
, KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.name()
);
kafkaSpoutConfigOptions.putIfAbsent( ConsumerConfig.GROUP_ID_CONFIG
, inputTopic + "_parser"
);
if(securityProtocol.isPresent()) {
kafkaSpoutConfigOptions.putIfAbsent("security.protocol", KafkaUtils.INSTANCE.normalizeProtocol(securityProtocol.get()));
}
return SimpleStormKafkaBuilder.create( inputTopic
, zkQuorum
, Arrays.asList( SimpleStormKafkaBuilder.FieldsConfiguration.VALUE.getFieldName()
, SimpleStormKafkaBuilder.FieldsConfiguration.KEY.getFieldName()
, SimpleStormKafkaBuilder.FieldsConfiguration.TOPIC.getFieldName()
)
, kafkaSpoutConfigOptions
);
}
/**
* Create a Kafka writer.
*
* @param broker An optional URL to the Kafka brokers.
* @param zkQuorum The URL to Zookeeper.
* @param securityProtocol An optional security protocol in use.
* @return
*/
private static KafkaWriter createKafkaWriter(Optional<String> broker,
String zkQuorum,
Optional<String> securityProtocol) {
KafkaWriter writer = new KafkaWriter();
// cluster URL; either broker or zookeeper
if(broker.isPresent()) {
writer.withBrokerUrl(broker.get());
} else {
writer.withZkQuorum(zkQuorum);
}
// security protocol
if(securityProtocol.isPresent()) {
HashMap<String, Object> config = new HashMap<>();
config.put("security.protocol", securityProtocol.get());
writer.withProducerConfigs(config);
}
return writer;
}
/**
* Create a bolt that parses input from a sensor.
*
* @param zookeeperUrl Zookeeper URL
* @param brokerUrl Kafka Broker URL
* @param sensorTypeToParserConfig
* @param configs
* @return A Storm bolt that parses input from a sensor
*/
private static ParserBolt createParserBolt( String zookeeperUrl,
Optional<String> brokerUrl,
Map<String, SensorParserConfig> sensorTypeToParserConfig,
Optional<String> securityProtocol,
ParserConfigurations configs,
Optional<String> outputTopic) {
Map<String, ParserComponents> parserBoltConfigs = new HashMap<>();
for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) {
String sensorType = entry.getKey();
SensorParserConfig parserConfig = entry.getValue();
// create message parser
MessageParser<JSONObject> parser = ReflectionUtils
.createInstance(parserConfig.getParserClassName());
parser.configure(parserConfig.getParserConfig());
// create message filter
MessageFilter<JSONObject> filter = null;
if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
filter = Filters.get(
parserConfig.getFilterClassName(),
parserConfig.getParserConfig()
);
}
// create a writer
AbstractWriter writer;
if (parserConfig.getWriterClassName() == null) {
// if not configured, use a sensible default
writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
.withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
} else {
writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
}
// configure it
writer.configure(sensorType, new ParserWriterConfiguration(configs));
// create a writer handler
WriterHandler writerHandler = createWriterHandler(writer);
ParserComponents components = new ParserComponents(
parser,
filter,
writerHandler
);
parserBoltConfigs.put(sensorType, components);
}
return new ParserBolt(zookeeperUrl, parserBoltConfigs);
}
/**
* Create a bolt that handles error messages.
*
* @param zookeeperUrl Kafka zookeeper URL
* @param brokerUrl Kafka Broker URL
* @param sensorType Type of sensor that is being consumed.
* @param securityProtocol Security protocol used (if any)
* @param configs
* @param parserConfig The sensor's parser configuration.
* @return A Storm bolt that handles error messages.
*/
private static WriterBolt createErrorBolt( String zookeeperUrl,
Optional<String> brokerUrl,
String sensorType,
Optional<String> securityProtocol,
ParserConfigurations configs,
SensorParserConfig parserConfig,
String errorTopic) {
// create a writer
AbstractWriter writer;
if (parserConfig.getErrorWriterClassName() == null) {
if(errorTopic == null) {
errorTopic = (String) configs.getGlobalConfig().get(Constants.PARSER_ERROR_TOPIC_GLOBALS_KEY);
}
// if not configured, uses a sensible default
writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
.withTopic(errorTopic)
.withConfigPrefix("error");
} else {
writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
}
// configure it
writer.configure(sensorType, new ParserWriterConfiguration(configs));
// create a writer handler
WriterHandler writerHandler = createWriterHandler(writer);
return new WriterBolt(writerHandler, configs, sensorType)
.withErrorType(Constants.ErrorType.PARSER_ERROR);
}
/**
* Fetch the parser configuration from Zookeeper.
*
* @param zookeeperUrl Zookeeper URL
* @param sensorTypes Types of sensor
* @param configs
* @return
* @throws Exception
*/
private static Map<String, SensorParserConfig> getSensorParserConfig(String zookeeperUrl, List<String> sensorTypes, ParserConfigurations configs) throws Exception {
Map<String, SensorParserConfig> parserConfigs = new HashMap<>();
try(CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl)) {
client.start();
ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
for (String sensorType : sensorTypes) {
SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType);
if (parserConfig == null) {
throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." +
" Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command.");
}
parserConfigs.put(sensorType, parserConfig);
}
}
return parserConfigs;
}
/**
* Creates a WriterHandler
*
* @param writer The writer.
* @return A WriterHandler
*/
private static WriterHandler createWriterHandler(AbstractWriter writer) {
if (writer instanceof BulkMessageWriter) {
return new WriterHandler((BulkMessageWriter<JSONObject>) writer);
} else if (writer instanceof MessageWriter) {
return new WriterHandler((MessageWriter<JSONObject>) writer);
} else {
throw new IllegalStateException("Unable to create parser bolt: writer must be a MessageWriter or a BulkMessageWriter");
}
}
}