blob: f57a730c4446d88de04f3bcdaa3f4f360891cfed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka;
import com.google.common.base.Strings;
import org.apache.storm.Config;
import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
// TODO: need to add blacklisting
// TODO: need to make a best effort to not re-emit messages if don't have to
/**
* @deprecated storm-kafka has been deprecated and will be removed in a future Storm release. Please upgrade to storm-kafka-client.
*/
@Deprecated
public class KafkaSpout extends BaseRichSpout {
static enum EmitState {
EMITTED_MORE_LEFT,
EMITTED_END,
NO_EMITTED
}
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
PartitionCoordinator _coordinator;
DynamicPartitionConnections _connections;
ZkState _state;
long _lastUpdateMs = 0;
int _currPartitionIndex = 0;
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
}
@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
String topologyInstanceId = context.getStormId();
Map<String, Object> stateConf = new HashMap<>(conf);
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, topologyInstanceId);
} else {
_coordinator = new ZkCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, topologyInstanceId);
}
context.registerMetric("kafkaOffset", new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Set<Partition> latestPartitions = new HashSet();
for (PartitionManager pm : pms) {
latestPartitions.add(pm.getPartition());
}
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setOffsetData(pm.getPartition(), pm.getOffsetData());
}
return _kafkaOffsetMetric.getValueAndReset();
}
}, _spoutConfig.metricsTimeBucketSizeInSecs);
context.registerMetric("kafkaPartition", new IMetric() {
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {
concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
}
return concatMetricsDataMaps;
}
}, _spoutConfig.metricsTimeBucketSizeInSecs);
}
@Override
public void close() {
_state.close();
}
@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
/*
As far as the System.currentTimeMillis() is dependent on System clock,
additional check on negative value of diffWithNow in case of external changes.
*/
if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
commit();
}
}
@Override
public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.ack(id.offset);
} else {
// managers for partitions changed - try to find new manager responsible for that partition
PartitionManager newManager = tryToFindNewManager(id.partition);
if (newManager != null) {
newManager.ack(id.offset);
}
}
}
@Override
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.fail(id.offset);
} else {
// managers for partitions changed - try to find new manager responsible for that partition
PartitionManager newManager = tryToFindNewManager(id.partition);
if (newManager != null) {
newManager.fail(id.offset);
}
}
}
@Override
public void deactivate() {
commit();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields());
} else {
declarer.declare(_spoutConfig.scheme.getOutputFields());
}
}
@Override
public Map<String, Object> getComponentConfiguration () {
Map<String, Object> configuration = super.getComponentConfiguration();
if (configuration == null) {
configuration = new HashMap<>();
}
String configKeyPrefix = "config.";
configuration.put(configKeyPrefix + "topics", this._spoutConfig.topic);
StringBuilder zkServers = new StringBuilder();
if (_spoutConfig.zkServers != null && _spoutConfig.zkServers.size() > 0) {
for (String zkServer : this._spoutConfig.zkServers) {
zkServers.append(zkServer + ":" + this._spoutConfig.zkPort + ",");
}
configuration.put(configKeyPrefix + "zkServers", zkServers.toString());
}
BrokerHosts brokerHosts = this._spoutConfig.hosts;
String zkRoot = this._spoutConfig.zkRoot + "/" + this._spoutConfig.id;
if (brokerHosts instanceof ZkHosts) {
ZkHosts zkHosts = (ZkHosts) brokerHosts;
configuration.put(configKeyPrefix + "zkNodeBrokers", zkHosts.brokerZkPath);
} else if (brokerHosts instanceof StaticHosts) {
StaticHosts staticHosts = (StaticHosts) brokerHosts;
GlobalPartitionInformation globalPartitionInformation = staticHosts.getPartitionInformation();
boolean useTopicNameForPath = globalPartitionInformation.getbUseTopicNameForPartitionPathId();
if (useTopicNameForPath) {
zkRoot += ("/" + this._spoutConfig.topic);
}
List<Partition> partitions = globalPartitionInformation.getOrderedPartitions();
StringBuilder staticPartitions = new StringBuilder();
StringBuilder leaderHosts = new StringBuilder();
for (Partition partition: partitions) {
staticPartitions.append(partition.partition + ",");
leaderHosts.append(partition.host.host + ":" + partition.host.port).append(",");
}
configuration.put(configKeyPrefix + "partitions", staticPartitions.toString());
configuration.put(configKeyPrefix + "leaders", leaderHosts.toString());
}
configuration.put(configKeyPrefix + "zkRoot", zkRoot);
return configuration;
}
private PartitionManager tryToFindNewManager(Partition partition) {
for (PartitionManager partitionManager : _coordinator.getMyManagedPartitions()) {
if (partitionManager.getPartition().partition == partition.partition
&& partitionManager.getPartition().topic.equals(partition.topic)) {
return partitionManager;
}
}
return null;
}
private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
manager.commit();
}
}
}