blob: c10ab2ebc9d306dd52252bc44f688a9c8d85ae7c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.executor.spout;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.ICredentialsListener;
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
import org.apache.storm.daemon.metrics.BuiltinMetrics;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
import org.apache.storm.daemon.metrics.BuiltinSpoutMetrics;
import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.policy.IWaitStrategy.WaitSituation;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.stats.SpoutExecutorStats;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SpoutExecutor extends Executor {
private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
private final IWaitStrategy spoutWaitStrategy;
private final IWaitStrategy backPressureWaitStrategy;
private final AtomicBoolean lastActive;
private final MutableLong emittedCount;
private final MutableLong emptyEmitStreak;
private final SpoutThrottlingMetrics spoutThrottlingMetrics;
private final boolean hasAckers;
private final SpoutExecutorStats stats;
private final BuiltinMetrics builtInMetrics;
SpoutOutputCollectorImpl spoutOutputCollector;
private Integer maxSpoutPending;
private List<ISpout> spouts;
private List<SpoutOutputCollector> outputCollectors;
private RotatingMap<Long, TupleInfo> pending;
private long threadId = 0;
public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);
this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
this.spoutWaitStrategy.prepare(topoConf, WaitSituation.SPOUT_WAIT);
this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
this.backPressureWaitStrategy.prepare(topoConf, WaitSituation.BACK_PRESSURE_WAIT);
this.lastActive = new AtomicBoolean(false);
this.hasAckers = StormCommon.hasAckers(topoConf);
this.emittedCount = new MutableLong(0);
this.emptyEmitStreak = new MutableLong(0);
this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
this.stats = new SpoutExecutorStats(
ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
this.builtInMetrics = new BuiltinSpoutMetrics(stats);
public SpoutExecutorStats getStats() {
return stats;
public void init(final ArrayList<Task> idToTask, int idToTaskBase) throws InterruptedException {
this.threadId = Thread.currentThread().getId();
while (!stormActive.get()) {
//Topology may be deployed in deactivated mode, wait for activation
}"Opening spout {}:{}", componentId, taskIds);
this.idToTask = idToTask;
this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
this.spouts = new ArrayList<>();
for (Task task : idToTask) {
if (task != null) {
this.spouts.add((ISpout) task.getTaskObject());
this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
public void expire(Long key, TupleInfo tupleInfo) {
Long timeDelta = null;
if (tupleInfo.getTimestamp() != 0) {
timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() - idToTaskBase), timeDelta, tupleInfo, "TIMEOUT");
this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
this.outputCollectors = new ArrayList<>();
for (int i = 0; i < idToTask.size(); ++i) {
Task taskData = idToTask.get(i);
if (taskData == null) {
ISpout spoutObject = (ISpout) taskData.getTaskObject();
spoutOutputCollector = new SpoutOutputCollectorImpl(
spoutObject, this, taskData, emittedCount,
hasAckers, rand, hasEventLoggers, isDebug, pending);
SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
builtInMetrics.registerAll(topoConf, taskData.getUserContext());
Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, taskData.getUserContext());
if (spoutObject instanceof ICredentialsListener) {
((ICredentialsListener) spoutObject).setCredentials(credentials);
}, taskData.getUserContext(), outputCollector);
openOrPrepareWasCalled.set(true);"Opened spout {}:{}", componentId, taskIds);
public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);
return new Callable<Long>() {
final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
int recvqCheckSkips = 0;
int swIdleCount = 0; // counter for spout wait strategy
int bpIdleCount = 0; // counter for back pressure wait strategy
int rmspCount = 0;
public Long call() throws Exception {
int receiveCount = 0;
if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
receiveCount = receiveQueue.consume(SpoutExecutor.this);
recvqCheckSkips = 0;
long currCount = emittedCount.get();
boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
boolean isActive = stormActive.get();
if (!isActive) {
return 0L;
if (!lastActive.get()) {
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
boolean noEmits = true;
long emptyStretch = 0;
if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
noEmits = (currCount == emittedCount.get());
if (noEmits) {
} else {
emptyStretch = emptyEmitStreak.get();
if (reachedMaxSpoutPending) {
if (rmspCount == 0) {
LOG.debug("Reached max spout pending");
} else {
if (rmspCount > 0) {
LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
rmspCount = 0;
if (receiveCount > 1) {
// continue without idling
return 0L;
if (!pendingEmits.isEmpty()) { // then facing backpressure
return 0L;
bpIdleCount = 0;
if (noEmits) {
spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
return 0L;
swIdleCount = 0;
return 0L;
private void backPressureWaitStrategy() throws InterruptedException {
long start = Time.currentTimeMillis();
if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
long start = Time.currentTimeMillis();
swIdleCount = spoutWaitStrategy.idle(swIdleCount);
if (reachedMaxSpoutPending) {
spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
} else {
if (emptyStretch > 0) {
LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
// returns true if pendingEmits is empty
private boolean tryFlushPendingEmits() {
for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
if (executorTransfer.tryTransfer(t, null)) {
} else { // to avoid reordering of emits, stop at first failure
return false;
return true;
private void activateSpouts() {"Activating spout {}:{}", componentId, taskIds);
for (ISpout spout : spouts) {
private void deactivateSpouts() {"Deactivating spout {}:{}", componentId, taskIds);
for (ISpout spout : spouts) {
private void inactiveExecute() throws InterruptedException {
if (lastActive.get()) {
long start = Time.currentTimeMillis();
spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - start);
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
} else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
} else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
} else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
if (spoutObj instanceof ICredentialsListener) {
((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
} else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
Long id = (Long) tuple.getValue(0);
TupleInfo pendingForId = pending.get(id);
if (pendingForId != null) {
pending.put(id, pendingForId);
} else {
Long id = (Long) tuple.getValue(0);
Long timeDeltaMs = (Long) tuple.getValue(1);
TupleInfo tupleInfo = pending.remove(id);
if (tupleInfo != null && tupleInfo.getMessageId() != null) {
if (taskId != tupleInfo.getTaskId()) {
throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
Long timeDelta = null;
if (hasAckers) {
long startTimeMs = tupleInfo.getTimestamp();
if (startTimeMs != 0) {
timeDelta = timeDeltaMs;
if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
} else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {"SPOUT Acking message {} {}", tupleInfo.getRootId(), tupleInfo.getMessageId());
if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
if (hasAckers && timeDelta != null) {
executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {"SPOUT Failing {} : {} REASON: {}", tupleInfo.getRootId(), tupleInfo, reason);
new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
if (timeDelta != null) {
executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta,
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
public int getSpoutRecvqCheckSkipCount() {
if (ackingEnabled) {
return 0; // always check recQ if ACKing enabled
return ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS), 0);
public long getThreadId() {
return threadId;