blob: 53675c473c22cc67486e8579fc295d21da2942f1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.instance.spout;
import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import com.google.protobuf.Message;
import org.apache.heron.api.Config;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.metric.GlobalMetrics;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.spout.ISpout;
import org.apache.heron.api.spout.SpoutOutputCollector;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.IUpdatable;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.FullSpoutMetrics;
import org.apache.heron.common.utils.metrics.ISpoutMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.common.utils.topology.TopologyContextImpl;
import org.apache.heron.instance.IInstance;
import org.apache.heron.instance.util.InstanceUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
public class SpoutInstance implements IInstance {
private static final Logger LOG = Logger.getLogger(SpoutInstance.class.getName());
protected final ISpout spout;
protected final SpoutOutputCollectorImpl collector;
protected final ISpoutMetrics spoutMetrics;
// The spout will read Control tuples from streamInQueue
private final Communicator<Message> streamInQueue;
private final boolean ackEnabled;
private final boolean enableMessageTimeouts;
private final boolean isTopologyStateful;
private final boolean spillState;
private final String spillStateLocation;
private State<Serializable, Serializable> instanceState;
private final SlaveLooper looper;
private final SystemConfig systemConfig;
// The reference to topology's config
private final Map<String, Object> config;
private PhysicalPlanHelper helper;
/**
* Construct a SpoutInstance basing on given arguments
*/
public SpoutInstance(PhysicalPlanHelper helper,
Communicator<Message> streamInQueue,
Communicator<Message> streamOutQueue,
SlaveLooper looper) {
this.helper = helper;
this.looper = looper;
this.streamInQueue = streamInQueue;
this.spoutMetrics = new FullSpoutMetrics();
this.spoutMetrics.initMultiCountMetrics(helper);
this.config = helper.getTopologyContext().getTopologyConfig();
this.systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(
SystemConfig.HERON_SYSTEM_CONFIG);
this.enableMessageTimeouts =
Boolean.parseBoolean((String) config.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS));
this.isTopologyStateful = String.valueOf(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE)
.equals(config.get(Config.TOPOLOGY_RELIABILITY_MODE));
this.spillState =
Boolean.parseBoolean((String) config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE));
this.spillStateLocation = String.format("%s/%s/",
String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)),
helper.getMyInstanceId());
LOG.info("Is this topology stateful: " + isTopologyStateful);
if (helper.getMySpout() == null) {
throw new RuntimeException("HeronSpoutInstance has no spout in physical plan");
}
// Get the spout. Notice, in fact, we will always use the deserialization way to get bolt.
if (helper.getMySpout().getComp().hasSerializedObject()) {
this.spout = (ISpout) Utils.deserialize(
helper.getMySpout().getComp().getSerializedObject().toByteArray());
} else if (helper.getMySpout().getComp().hasClassName()) {
String spoutClassName = helper.getMySpout().getComp().getClassName();
try {
spout = (ISpout) Class.forName(spoutClassName).newInstance();
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex + " Spout class must be in class path.");
} catch (InstantiationException ex) {
throw new RuntimeException(ex + " Spout class must be concrete.");
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex + " Spout class must have a no-arg constructor.");
}
} else {
throw new RuntimeException("Neither java_object nor java_class_name set for spout");
}
IPluggableSerializer serializer = SerializeDeSerializeHelper.getSerializer(config);
collector = new SpoutOutputCollectorImpl(serializer, helper, streamOutQueue, spoutMetrics);
this.ackEnabled = collector.isAckEnabled();
LOG.info("Enable Ack: " + this.ackEnabled);
LOG.info("EnableMessageTimeouts: " + this.enableMessageTimeouts);
}
@Override
public void update(PhysicalPlanHelper physicalPlanHelper) {
if (spout instanceof IUpdatable) {
((IUpdatable) spout).update(physicalPlanHelper.getTopologyContext());
}
collector.updatePhysicalPlanHelper(physicalPlanHelper);
// Re-prepare the CustomStreamGrouping since the downstream tasks can change
physicalPlanHelper.prepareForCustomStreamGrouping();
// Reset the helper
helper = physicalPlanHelper;
}
@Override
public void persistState(String checkpointId) {
LOG.info("Persisting state for checkpoint: " + checkpointId);
if (!isTopologyStateful) {
throw new RuntimeException("Could not save a non-stateful topology's state");
}
// need to synchronize with other OutgoingTupleCollection operations
// so that topology emit, ack, fail are thread safe
collector.lock.lock();
try {
if (spout instanceof IStatefulComponent) {
((IStatefulComponent) spout).preSave(checkpointId);
}
collector.sendOutState(instanceState, checkpointId, spillState, spillStateLocation);
} finally {
collector.lock.unlock();
}
LOG.info("State persisted for checkpoint: " + checkpointId);
}
@SuppressWarnings("unchecked")
@Override
public void init(State<Serializable, Serializable> state) {
TopologyContextImpl topologyContext = helper.getTopologyContext();
// Initialize the GlobalMetrics
GlobalMetrics.init(topologyContext, systemConfig.getHeronMetricsExportInterval());
spoutMetrics.registerMetrics(topologyContext);
// Initialize the instanceState if the topology is stateful and spout is a stateful component
if (isTopologyStateful && spout instanceof IStatefulComponent) {
this.instanceState = state;
((IStatefulComponent<Serializable, Serializable>) spout).initState(instanceState);
if (spillState) {
if (FileUtils.isDirectoryExists(spillStateLocation)) {
FileUtils.cleanDir(spillStateLocation);
} else {
FileUtils.createDirectory(spillStateLocation);
}
}
}
spout.open(
topologyContext.getTopologyConfig(), topologyContext, new SpoutOutputCollector(collector));
// Invoke user-defined prepare task hook
topologyContext.invokeHookPrepare();
// Init the CustomStreamGrouping
helper.prepareForCustomStreamGrouping();
}
@Override
public void start() {
// Add spout tasks for execution
addSpoutsTasks();
}
@Override
public void clean() {
// Invoke clean up hook before clean() is called
helper.getTopologyContext().invokeHookCleanup();
// Delegate to user-defined clean-up method
spout.close();
// Clean the resources we own
streamInQueue.clear();
collector.clear();
}
@Override
public void shutdown() {
clean();
looper.exitLoop();
}
@Override
public void activate() {
LOG.info("Spout is activated");
spout.activate();
}
@Override
public void deactivate() {
LOG.info("Spout is deactivated");
spout.deactivate();
}
// Tasks happen in every time looper is waken up
private void addSpoutsTasks() {
// Register spoutTasks
Runnable spoutTasks = new Runnable() {
@Override
public void run() {
spoutMetrics.updateTaskRunCount();
// Check whether we should produce more tuples
if (isProduceTuple()) {
spoutMetrics.updateProduceTupleCount();
produceTuple();
// Though we may execute MAX_READ tuples, finally we will packet it as
// one outgoingPacket and push to out queues
// Notice: Tuples are not necessary emitted during nextTuple methods. We could emit
// tuples as long as we invoke collector.emit(...)
collector.sendOutTuples();
}
if (!collector.isOutQueuesAvailable()) {
spoutMetrics.updateOutQueueFullCount();
}
// Check if we have any message to process anyway
readTuplesAndExecute(streamInQueue);
if (ackEnabled) {
// Update the pending-to-be-acked tuples counts
spoutMetrics.updatePendingTuplesCount(collector.numInFlight());
} else {
doImmediateAcks();
}
// If we have more work to do
if (isContinueWork()) {
spoutMetrics.updateContinueWorkCount();
looper.wakeUp();
}
}
};
looper.addTasksOnWakeup(spoutTasks);
// Look for the timeout's tuples
if (enableMessageTimeouts) {
lookForTimeouts();
}
InstanceUtils.prepareTimerEvents(looper, helper);
}
/**
* Check whether we still need to do more work.
* When the topology state is in RUNNING:
* 1. If the out Queue is not full and ack is not enabled, we could just wake up next time
* to produce more tuples and push to the out Queue
* <p>
* 2. It the out Queue is not full but the ack is enabled, we also need to make sure the
* tuples waiting smaller than msp
* <p>
* 3. If there are more to read, we will wake up itself next time when it doWait()
*
* @return true Wake up itself directly in next looper.doWait()
*/
private boolean isContinueWork() {
long maxSpoutPending = TypeUtils.getLong(config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
return helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING)
&&
((!ackEnabled && collector.isOutQueuesAvailable())
||
(ackEnabled
&& collector.isOutQueuesAvailable()
&& collector.numInFlight() < maxSpoutPending)
||
(ackEnabled && !streamInQueue.isEmpty()));
}
/**
* Check whether we could produce tuples, i.e. invoke spout.nextTuple()
* It is allowed in:
* 1. Outgoing Stream queue is available
* 2. Topology State is RUNNING
*
* @return true to allow produceTuple() to be invoked
*/
private boolean isProduceTuple() {
return collector.isOutQueuesAvailable()
&& helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING);
}
protected void produceTuple() {
int maxSpoutPending = TypeUtils.getInteger(config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
long totalTuplesEmitted = collector.getTotalTuplesEmitted();
long totalBytesEmitted = collector.getTotalBytesEmitted();
Duration instanceEmitBatchTime = systemConfig.getInstanceEmitBatchTime();
ByteAmount instanceEmitBatchSize = systemConfig.getInstanceEmitBatchSize();
long startOfCycle = System.nanoTime();
// We would reuse the System.nanoTime()
long currentTime = startOfCycle;
while (!ackEnabled || (maxSpoutPending > collector.numInFlight())) {
// Delegate to the use defined spout
spout.nextTuple();
// Swap
long startTime = currentTime;
currentTime = System.nanoTime();
long latency = currentTime - startTime;
spoutMetrics.nextTuple(latency);
long newTotalTuplesEmitted = collector.getTotalTuplesEmitted();
long newTotalBytesEmitted = collector.getTotalBytesEmitted();
if (newTotalTuplesEmitted == totalTuplesEmitted) {
// No tuples to emit....
break;
}
totalTuplesEmitted = newTotalTuplesEmitted;
// To avoid spending too much time
if (currentTime - startOfCycle - instanceEmitBatchTime.toNanos() > 0) {
break;
}
if (!ByteAmount.fromBytes(newTotalBytesEmitted - totalBytesEmitted)
.lessThan(instanceEmitBatchSize)) {
break;
}
}
}
private void handleAckTuple(HeronTuples.AckTuple ackTuple, boolean isSuccess) {
for (HeronTuples.RootId rt : ackTuple.getRootsList()) {
if (rt.getTaskid() != helper.getMyTaskId()) {
throw new RuntimeException(String.format("Receiving tuple for task %d in task %d",
rt.getTaskid(), helper.getMyTaskId()));
} else {
long rootId = rt.getKey();
RootTupleInfo rootTupleInfo = collector.retireInFlight(rootId);
// This tuple has been removed due to time-out
if (rootTupleInfo == null) {
return;
}
Object messageId = rootTupleInfo.getMessageId();
if (messageId != null) {
Duration latency = Duration.ofNanos(System.nanoTime())
.minusNanos(rootTupleInfo.getInsertionTime());
if (isSuccess) {
invokeAck(messageId, rootTupleInfo.getStreamId(), latency);
} else {
invokeFail(messageId, rootTupleInfo.getStreamId(), latency);
}
}
}
}
}
private void lookForTimeouts() {
Duration timeout = TypeUtils.getDuration(
config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), ChronoUnit.SECONDS);
int nBucket = systemConfig.getInstanceAcknowledgementNbuckets();
List<RootTupleInfo> expiredObjects = collector.retireExpired(timeout);
for (RootTupleInfo rootTupleInfo : expiredObjects) {
spoutMetrics.timeoutTuple(rootTupleInfo.getStreamId());
invokeFail(rootTupleInfo.getMessageId(), rootTupleInfo.getStreamId(), timeout);
}
Runnable lookForTimeoutsTask = new Runnable() {
@Override
public void run() {
lookForTimeouts();
}
};
looper.registerTimerEvent(timeout.dividedBy(nBucket), lookForTimeoutsTask);
}
@Override
public void readTuplesAndExecute(Communicator<Message> inQueue) {
// Read data from in Queues
long startOfCycle = System.nanoTime();
Duration spoutAckBatchTime = systemConfig.getInstanceAckBatchTime();
while (!inQueue.isEmpty()) {
Message msg = inQueue.poll();
if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
String checkpintId = ((CheckpointManager.InitiateStatefulCheckpoint) msg).getCheckpointId();
persistState(checkpintId);
} else if (msg instanceof HeronTuples.HeronTupleSet) {
HeronTuples.HeronTupleSet tuples = (HeronTuples.HeronTupleSet) msg;
// For spout, it should read only control tuples(ack&fail)
if (tuples.hasData()) {
throw new RuntimeException("Spout cannot get incoming data tuples from other components");
}
if (tuples.hasControl()) {
for (HeronTuples.AckTuple aT : tuples.getControl().getAcksList()) {
handleAckTuple(aT, true);
}
for (HeronTuples.AckTuple aT : tuples.getControl().getFailsList()) {
handleAckTuple(aT, false);
}
}
// To avoid spending too much time
if (System.nanoTime() - startOfCycle - spoutAckBatchTime.toNanos() > 0) {
break;
}
} else {
// Spout instance should not receive any other messages except the above ones
throw new RuntimeException("Invalid data sent to spout instance");
}
}
}
private void doImmediateAcks() {
// In this iteration, we will only look at the immediateAcks size
// Otherwise, it could be that eveytime we do an ack, the spout is
// doing generating another tuple leading to an infinite loop
int s = collector.getImmediateAcks().size();
for (int i = 0; i < s; ++i) {
RootTupleInfo tupleInfo = collector.getImmediateAcks().poll();
invokeAck(tupleInfo.getMessageId(), tupleInfo.getStreamId(), Duration.ZERO);
}
}
private void invokeAck(Object messageId, String streamId, Duration completeLatency) {
// delegate to user-defined methods
spout.ack(messageId);
// Invoke user-defined task hooks
helper.getTopologyContext().invokeHookSpoutAck(messageId, completeLatency);
// Update metrics
spoutMetrics.ackedTuple(streamId, completeLatency.toNanos());
}
private void invokeFail(Object messageId, String streamId, Duration failLatency) {
// delegate to user-defined methods
spout.fail(messageId);
// Invoke user-defined task hooks
helper.getTopologyContext().invokeHookSpoutFail(messageId, failLatency);
// Update metrics
spoutMetrics.failedTuple(streamId, failLatency.toNanos());
}
}