blob: bd284c4a6dc28d7e46871b598bd2f4f7a35d3a50 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.executor.spout;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.ICredentialsListener;
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.hooks.info.SpoutAckInfo;
import org.apache.storm.hooks.info.SpoutFailInfo;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.policy.IWaitStrategy.WaitSituation;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.stats.SpoutExecutorStats;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SpoutExecutor extends Executor {
private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
private final IWaitStrategy spoutWaitStrategy;
private final IWaitStrategy backPressureWaitStrategy;
private final AtomicBoolean lastActive;
private final MutableLong emittedCount;
private final MutableLong emptyEmitStreak;
private final SpoutThrottlingMetrics spoutThrottlingMetrics;
private final boolean hasAckers;
private final SpoutExecutorStats stats;
SpoutOutputCollectorImpl spoutOutputCollector;
private Integer maxSpoutPending;
private List<ISpout> spouts;
private List<SpoutOutputCollector> outputCollectors;
private RotatingMap<Long, TupleInfo> pending;
private long threadId = 0;
public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);
this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
this.spoutWaitStrategy.prepare(topoConf, WaitSituation.SPOUT_WAIT);
this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
this.backPressureWaitStrategy.prepare(topoConf, WaitSituation.BACK_PRESSURE_WAIT);
this.lastActive = new AtomicBoolean(false);
this.hasAckers = StormCommon.hasAckers(topoConf);
this.emittedCount = new MutableLong(0);
this.emptyEmitStreak = new MutableLong(0);
this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
this.stats = new SpoutExecutorStats(
ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
}
@Override
public SpoutExecutorStats getStats() {
return stats;
}
public void init(final ArrayList<Task> idToTask, int idToTaskBase) throws InterruptedException {
this.threadId = Thread.currentThread().getId();
executorTransfer.initLocalRecvQueues();
workerReady.await();
while (!stormActive.get()) {
//Topology may be deployed in deactivated mode, wait for activation
Utils.sleepNoSimulation(100);
}
LOG.info("Opening spout {}:{}", componentId, taskIds);
this.idToTask = idToTask;
this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
this.spouts = new ArrayList<>();
for (Task task : idToTask) {
if (task != null) {
this.spouts.add((ISpout) task.getTaskObject());
}
}
this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
@Override
public void expire(Long key, TupleInfo tupleInfo) {
Long timeDelta = null;
if (tupleInfo.getTimestamp() != 0) {
timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
}
failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() - idToTaskBase), timeDelta, tupleInfo, "TIMEOUT");
}
});
this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
this.outputCollectors = new ArrayList<>();
for (int i = 0; i < idToTask.size(); ++i) {
Task taskData = idToTask.get(i);
if (taskData == null) {
continue;
}
ISpout spoutObject = (ISpout) taskData.getTaskObject();
spoutOutputCollector = new SpoutOutputCollectorImpl(
spoutObject, this, taskData, emittedCount,
hasAckers, rand, hasEventLoggers, isDebug, pending);
SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
this.outputCollectors.add(outputCollector);
if (spoutObject instanceof ICredentialsListener) {
((ICredentialsListener) spoutObject).setCredentials(credentials);
}
spoutObject.open(topoConf, taskData.getUserContext(), outputCollector);
}
openOrPrepareWasCalled.set(true);
LOG.info("Opened spout {}:{}", componentId, taskIds);
setupTicks(true);
setupMetrics();
}
@Override
public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);
return new Callable<Long>() {
final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
int recvqCheckSkips = 0;
int swIdleCount = 0; // counter for spout wait strategy
int bpIdleCount = 0; // counter for back pressure wait strategy
int rmspCount = 0;
@Override
public Long call() throws Exception {
updateExecCredsIfRequired();
int receiveCount = 0;
if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
receiveCount = receiveQueue.consume(SpoutExecutor.this);
recvqCheckSkips = 0;
}
long currCount = emittedCount.get();
boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
boolean isActive = stormActive.get();
if (!isActive) {
inactiveExecute();
return 0L;
}
if (!lastActive.get()) {
lastActive.set(true);
activateSpouts();
}
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
boolean noEmits = true;
long emptyStretch = 0;
if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
spouts.get(j).nextTuple();
}
noEmits = (currCount == emittedCount.get());
if (noEmits) {
emptyEmitStreak.increment();
} else {
emptyStretch = emptyEmitStreak.get();
emptyEmitStreak.set(0);
}
}
if (reachedMaxSpoutPending) {
if (rmspCount == 0) {
LOG.debug("Reached max spout pending");
}
rmspCount++;
} else {
if (rmspCount > 0) {
LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
}
rmspCount = 0;
}
if (receiveCount > 1) {
// continue without idling
return 0L;
}
if (!pendingEmits.isEmpty()) { // then facing backpressure
backPressureWaitStrategy();
return 0L;
}
bpIdleCount = 0;
if (noEmits) {
spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
return 0L;
}
swIdleCount = 0;
return 0L;
}
private void backPressureWaitStrategy() throws InterruptedException {
long start = Time.currentTimeMillis();
if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
}
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
}
private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
emptyEmitStreak.increment();
long start = Time.currentTimeMillis();
swIdleCount = spoutWaitStrategy.idle(swIdleCount);
if (reachedMaxSpoutPending) {
spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
} else {
if (emptyStretch > 0) {
LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
}
}
}
// returns true if pendingEmits is empty
private boolean tryFlushPendingEmits() {
for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
if (executorTransfer.tryTransfer(t, null)) {
pendingEmits.poll();
} else { // to avoid reordering of emits, stop at first failure
return false;
}
}
return true;
}
};
}
private void activateSpouts() {
LOG.info("Activating spout {}:{}", componentId, taskIds);
for (ISpout spout : spouts) {
spout.activate();
}
}
private void deactivateSpouts() {
LOG.info("Deactivating spout {}:{}", componentId, taskIds);
for (ISpout spout : spouts) {
spout.deactivate();
}
}
private void inactiveExecute() throws InterruptedException {
if (lastActive.get()) {
lastActive.set(false);
deactivateSpouts();
}
long start = Time.currentTimeMillis();
Time.sleep(100);
spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - start);
}
@Override
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
spoutOutputCollector.flush();
} else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
pending.rotate();
} else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
} else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
Long id = (Long) tuple.getValue(0);
TupleInfo pendingForId = pending.get(id);
if (pendingForId != null) {
pending.put(id, pendingForId);
}
} else {
Long id = (Long) tuple.getValue(0);
Long timeDeltaMs = (Long) tuple.getValue(1);
TupleInfo tupleInfo = pending.remove(id);
if (tupleInfo != null && tupleInfo.getMessageId() != null) {
if (taskId != tupleInfo.getTaskId()) {
throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
}
Long timeDelta = null;
if (hasAckers) {
long startTimeMs = tupleInfo.getTimestamp();
if (startTimeMs != 0) {
timeDelta = timeDeltaMs;
}
}
if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
} else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
}
}
}
}
public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {
LOG.info("SPOUT Acking message {} {}", tupleInfo.getRootId(), tupleInfo.getMessageId());
}
spout.ack(tupleInfo.getMessageId());
if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
}
if (hasAckers && timeDelta != null) {
executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
taskData.getTaskMetrics().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {
LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getRootId(), tupleInfo, reason);
}
spout.fail(tupleInfo.getMessageId());
new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
if (timeDelta != null) {
executor.getStats().spoutFailedTuple(tupleInfo.getStream());
taskData.getTaskMetrics().spoutFailedTuple(tupleInfo.getStream());
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
public int getSpoutRecvqCheckSkipCount() {
if (ackingEnabled) {
return 0; // always check recQ if ACKing enabled
}
return ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS), 0);
}
public long getThreadId() {
return threadId;
}
}