blob: 4338964242b8ebfe596ff4c884ed6bd4a3f11145 [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.eagle.alert.engine.spout;
import backtype.storm.spout.MultiScheme;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.commons.collections.CollectionUtils;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.SpoutSpec;
import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
import org.apache.eagle.alert.engine.coordinator.MetadataType;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.router.SpoutSpecListener;
import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
import org.apache.eagle.alert.engine.serialization.Serializers;
import org.apache.eagle.alert.utils.AlertConstants;
import org.apache.eagle.alert.utils.StreamIdConversion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.*;
import java.text.MessageFormat;
import java.util.*;
/**
* wrap KafkaSpout to provide parallel processing of messages for multiple Kafka topics
* <p>1. onNewConfig() is interface for outside to update new metadata. Upon new metadata, this class will calculate if there is any new topic, removed topic or
* updated topic</p>
*/
public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener, SerializationMetadataProvider {
private static final long serialVersionUID = -5280723341236671580L;
private static final Logger LOG = LoggerFactory.getLogger(CorrelationSpout.class);
public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT = "/consumers";
public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH = "/eagle_consumer";
// topic to KafkaSpoutWrapper
private volatile Map<String, KafkaSpoutWrapper> kafkaSpoutList = new HashMap<>();
private int numOfRouterBolts;
private SpoutSpec cachedSpoutSpec;
private transient KafkaSpoutMetric kafkaSpoutMetric;
@SuppressWarnings("rawtypes")
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
private final Config config;
private String topologyId;
private String spoutName;
private String routeBoltName;
@SuppressWarnings("unused")
private int taskIndex;
private IMetadataChangeNotifyService changeNotifyService;
private PartitionedEventSerializer serializer;
private volatile Map<String, StreamDefinition> sds;
/**
* FIXME one single changeNotifyService may have issues as possibly multiple spout tasks will register themselves and initialize service.
*
* @param config
* @param topologyId
* @param changeNotifyService
* @param numOfRouterBolts
*/
public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts) {
this(config, topologyId, changeNotifyService, numOfRouterBolts, AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME);
}
/**
* @param config
* @param topologyId used for distinguishing kafka offset for different topologies
* @param numOfRouterBolts used for generating streamId and routing
* @param spoutName used for generating streamId between spout and router bolt
* @param routerBoltName used for generating streamId between spout and router bolt.
*/
public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String spoutName, String routerBoltName) {
this.config = config;
this.topologyId = topologyId;
this.changeNotifyService = changeNotifyService;
this.numOfRouterBolts = numOfRouterBolts;
this.spoutName = spoutName;
this.routeBoltName = routerBoltName;
}
public String getSpoutName() {
return spoutName;
}
public String getRouteBoltName() {
return routeBoltName;
}
/**
* the only output field is for StreamEvent.
*
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for (int i = 0; i < numOfRouterBolts; i++) {
String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, routeBoltName + i);
declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
LOG.info("declare stream between spout and streamRouterBolt " + streamId);
}
}
@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
if (LOG.isDebugEnabled()) {
LOG.debug("open method invoked");
}
this.conf = conf;
this.context = context;
this.collector = collector;
this.taskIndex = context.getThisTaskIndex();
// initialize an empty SpoutSpec
cachedSpoutSpec = new SpoutSpec(topologyId, new HashMap<>(), new HashMap<>(), new HashMap<>());
changeNotifyService.registerListener(this);
changeNotifyService.init(config, MetadataType.SPOUT);
// register KafkaSpout metric
kafkaSpoutMetric = new KafkaSpoutMetric();
context.registerMetric("kafkaSpout", kafkaSpoutMetric, 60);
this.serializer = Serializers.newPartitionedEventSerializer(this);
}
@Override
public void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds) {
LOG.info("new metadata is updated " + spec);
try {
onReload(spec, sds);
} catch (Exception ex) {
LOG.error("error applying new SpoutSpec", ex);
}
}
@Override
public void nextTuple() {
for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
try {
wrapper.nextTuple();
} catch (Exception e) {
LOG.error("unexpected exception is caught: {}", e.getMessage(), e);
collector.reportError(e);
}
}
}
/**
* find the correct wrapper to do ack that means msgId should be mapped to
* wrapper.
*
* @param msgId
*/
@Override
public void ack(Object msgId) {
// decode and get topic
KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
if (spout != null) {
spout.ack(id.id);
}
}
@Override
public void fail(Object msgId) {
// decode and get topic
KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
LOG.error("Failing message {}, with topic {}", msgId, id.topic);
KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
if (spout != null) {
spout.fail(id.id);
}
}
@Override
public void deactivate() {
System.out.println("deactivate");
for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
wrapper.deactivate();
}
}
@Override
public void close() {
System.out.println("close");
for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
wrapper.close();
}
}
private List<String> getTopics(SpoutSpec spoutSpec) {
List<String> meta = new ArrayList<String>();
for (Kafka2TupleMetadata entry : spoutSpec.getKafka2TupleMetadataMap().values()) {
meta.add(entry.getTopic());
}
return meta;
}
@SuppressWarnings("unchecked")
public void onReload(final SpoutSpec newMeta, Map<String, StreamDefinition> sds) throws Exception {
// calculate topic create/remove/update
List<String> topics = getTopics(newMeta);
List<String> cachedTopcies = getTopics(cachedSpoutSpec);
Collection<String> newTopics = CollectionUtils.subtract(topics, cachedTopcies);
Collection<String> removeTopics = CollectionUtils.subtract(cachedTopcies, topics);
Collection<String> updateTopics = CollectionUtils.intersection(topics, cachedTopcies);
LOG.info("Topics were added={}, removed={}, modified={}", newTopics, removeTopics, updateTopics);
// build lookup table for scheme
Map<String, String> newSchemaName = new HashMap<String, String>();
Map<String, Map<String, String>> dataSourceProperties = new HashMap<>();
for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) {
newSchemaName.put(ds.getTopic(), ds.getSchemeCls());
dataSourceProperties.put(ds.getTopic(), ds.getProperties());
}
// copy and swap
Map<String, KafkaSpoutWrapper> newKafkaSpoutList = new HashMap<>(this.kafkaSpoutList);
// iterate new topics and then create KafkaSpout
for (String topic : newTopics) {
KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
if (wrapper != null) {
LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic));
continue;
}
try {
KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config),
conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds);
newKafkaSpoutList.put(topic, newWrapper);
} catch (Exception e) {
LOG.error("fail to create KafkaSpoutWrapper for topic {} due to {}", topic, e.getMessage(), e);
collector.reportError(e);
}
}
// iterate remove topics and then close KafkaSpout
for (String topic : removeTopics) {
KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
if (wrapper == null) {
LOG.warn(MessageFormat.format("try to remove topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
continue;
}
removeKafkaSpout(wrapper);
newKafkaSpoutList.remove(topic);
}
// iterate update topic and then update metadata
for (String topic : updateTopics) {
KafkaSpoutWrapper spoutWrapper = newKafkaSpoutList.get(topic);
if (spoutWrapper == null) {
LOG.warn(MessageFormat.format("try to update topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
continue;
}
spoutWrapper.update(newMeta, sds);
}
// swap
this.cachedSpoutSpec = newMeta;
this.kafkaSpoutList = newKafkaSpoutList;
this.sds = sds;
LOG.info("after CorrelationSpout reloads, {} kafkaSpouts are generated for {} topics", kafkaSpoutList.size(), topics.size());
}
/**
* make this method protected to make sure unit test can work well
* Q: Where to persist consumer state, i.e. what offset has been consumed for each topic and partition
* A: stormKafkaTransactionZkPath + "/" + consumerId + "/" + topic + "/" + topologyId + "/" + partitionId
* Note1: PartitionManager.committedPath for composing zkState path, _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
* consumerId by default is EagleConsumer unless it is specified by "stormKafkaEagleConsumer"
* Note2: put topologyId as part of zkState because one topic by design can be consumed by multiple topologies so one topology needs to know
* processed offset for itself
* <p>TODO: Should avoid use Config.get in deep calling stack, should generate config bean as early as possible
* </p>
*
* @param conf
* @param context
* @param collector
* @param topic
* @param spoutSpec
* @return
*/
@SuppressWarnings("rawtypes")
protected KafkaSpoutWrapper createKafkaSpout(Config configure, Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception {
String kafkaBrokerZkQuorum = configure.getString(AlertConstants.KAFKA_BROKER_ZK_QUORUM);
BrokerHosts hosts = null;
if (configure.hasPath("spout.kafkaBrokerZkBasePath")) {
hosts = new ZkHosts(kafkaBrokerZkQuorum, configure.getString(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH));
} else {
hosts = new ZkHosts(kafkaBrokerZkQuorum);
}
String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
if (configure.hasPath("spout.stormKafkaTransactionZkPath")) {
transactionZkRoot = configure.getString("spout.stormKafkaTransactionZkPath");
}
boolean logEventEnabled = false;
if (configure.hasPath("topology.logEventEnabled")) {
logEventEnabled = configure.getBoolean("topology.logEventEnabled");
}
// write partition offset etc. into zkRoot+id, see PartitionManager.committedPath
String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
if (configure.hasPath("spout.stormKafkaEagleConsumer")) {
zkStateTransactionRelPath = configure.getString("spout.stormKafkaEagleConsumer");
}
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId);
// transaction zkServers
boolean stormKafkaUseSameZkQuorumWithKafkaBroker = configure.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
if (stormKafkaUseSameZkQuorumWithKafkaBroker) {
ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum);
spoutConfig.zkServers = utils.getZkHosts();
spoutConfig.zkPort = utils.getZkPort();
} else {
ZkServerPortUtils utils = new ZkServerPortUtils(configure.getString("spout.stormKafkaTransactionZkQuorum"));
spoutConfig.zkServers = utils.getZkHosts();
spoutConfig.zkPort = utils.getZkPort();
}
// transaction update interval
spoutConfig.stateUpdateIntervalMs = configure.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? configure.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000;
// Kafka fetch size
spoutConfig.fetchSizeBytes = configure.hasPath("spout.stormKafkaFetchSizeBytes") ? configure.getInt("spout.stormKafkaFetchSizeBytes") : 1048586;
// "startOffsetTime" is for test usage, prod should not use this
if (configure.hasPath("spout.stormKafkaStartOffsetTime")) {
spoutConfig.startOffsetTime = configure.getInt("spout.stormKafkaStartOffsetTime");
}
spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);
KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds, this.serializer, logEventEnabled);
wrapper.open(conf, context, collectorWrapper);
if (LOG.isInfoEnabled()) {
LOG.info("create and open kafka wrapper: topic {}, scheme class{} ", topic, schemeClsName);
}
return wrapper;
}
private MultiScheme createMultiScheme(Map conf, String topic, String schemeClsName) throws Exception {
Object scheme = SchemeBuilder.buildFromClsName(schemeClsName, topic, conf);
if (scheme instanceof MultiScheme) {
return (MultiScheme) scheme;
} else if (scheme instanceof Scheme) {
return new SchemeAsMultiScheme((Scheme) scheme);
} else {
LOG.error("create spout scheme failed.");
throw new IllegalArgumentException("create spout scheme failed.");
}
}
@Override
public StreamDefinition getStreamDefinition(String streamId) {
return sds.get(streamId);
}
/**
* utility to get list of zkServers and zkPort.(It is assumed that zkPort is same for all zkServers as storm-kafka library requires this though it is not efficient)
*/
private static class ZkServerPortUtils {
private List<String> zkHosts = new ArrayList<>();
private Integer zkPort;
public ZkServerPortUtils(String zkQuorum) {
String[] zkConnections = zkQuorum.split(",");
for (String zkConnection : zkConnections) {
zkHosts.add(zkConnection.split(":")[0]);
}
zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
}
public List<String> getZkHosts() {
return zkHosts;
}
public Integer getZkPort() {
return zkPort;
}
}
protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) {
try {
wrapper.close();
} catch (Exception e) {
LOG.error("Close wrapper failed. Ignore and continue!", e);
}
}
}