blob: a53ed3d33775d266902404e02545ad586631f82d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.instance.bolt;
import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import com.google.protobuf.Message;
import org.apache.heron.api.Config;
import org.apache.heron.api.bolt.IBolt;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.metric.GlobalMetrics;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
import org.apache.heron.api.topology.IUpdatable;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.FullBoltMetrics;
import org.apache.heron.common.utils.metrics.IBoltMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.common.utils.topology.TopologyContextImpl;
import org.apache.heron.common.utils.tuple.TickTuple;
import org.apache.heron.common.utils.tuple.TupleImpl;
import org.apache.heron.instance.IInstance;
import org.apache.heron.instance.util.InstanceUtils;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
public class BoltInstance implements IInstance {
private static final Logger LOG = Logger.getLogger(BoltInstance.class.getName());
protected PhysicalPlanHelper helper;
protected final IBolt bolt;
protected final BoltOutputCollectorImpl collector;
protected final IPluggableSerializer serializer;
protected final IBoltMetrics boltMetrics;
// The bolt will read Data tuples from streamInQueue
private final Communicator<Message> streamInQueue;
private final boolean isTopologyStateful;
private final boolean spillState;
private final String spillStateLocation;
private State<Serializable, Serializable> instanceState;
// default to false, can only be toggled to true if bolt implements ITwoPhaseStatefulComponent
private boolean waitingForCheckpointSaved;
// The reference to topology's config
private final Map<String, Object> config;
private final SlaveLooper looper;
private final SystemConfig systemConfig;
/**
* Construct a BoltInstance basing on given arguments
*/
public BoltInstance(PhysicalPlanHelper helper,
Communicator<Message> streamInQueue,
Communicator<Message> streamOutQueue,
SlaveLooper looper) {
this.helper = helper;
this.looper = looper;
this.streamInQueue = streamInQueue;
this.boltMetrics = new FullBoltMetrics();
this.boltMetrics.initMultiCountMetrics(helper);
this.serializer =
SerializeDeSerializeHelper.getSerializer(helper.getTopologyContext().getTopologyConfig());
this.systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(
SystemConfig.HERON_SYSTEM_CONFIG);
this.config = helper.getTopologyContext().getTopologyConfig();
this.isTopologyStateful = String.valueOf(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE)
.equals(config.get(Config.TOPOLOGY_RELIABILITY_MODE));
LOG.info("Is this topology stateful: " + isTopologyStateful);
this.spillState =
Boolean.parseBoolean((String) config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE));
this.spillStateLocation = String.format("%s/%s/",
String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)),
helper.getMyInstanceId());
this.waitingForCheckpointSaved = false;
if (helper.getMyBolt() == null) {
throw new RuntimeException("HeronBoltInstance has no bolt in physical plan.");
}
// Get the bolt. Notice, in fact, we will always use the deserialization way to get bolt.
if (helper.getMyBolt().getComp().hasSerializedObject()) {
bolt = (IBolt) Utils.deserialize(
helper.getMyBolt().getComp().getSerializedObject().toByteArray());
} else if (helper.getMyBolt().getComp().hasClassName()) {
try {
String boltClassName = helper.getMyBolt().getComp().getClassName();
bolt = (IBolt) Class.forName(boltClassName).newInstance();
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex + " Bolt class must be in class path.");
} catch (InstantiationException ex) {
throw new RuntimeException(ex + " Bolt class must be concrete.");
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex + " Bolt class must have a no-arg constructor.");
}
} else {
throw new RuntimeException("Neither java_object nor java_class_name set for bolt");
}
collector = new BoltOutputCollectorImpl(serializer, helper, streamOutQueue, boltMetrics);
}
@Override
public void update(PhysicalPlanHelper physicalPlanHelper) {
if (bolt instanceof IUpdatable) {
((IUpdatable) bolt).update(physicalPlanHelper.getTopologyContext());
}
collector.updatePhysicalPlanHelper(physicalPlanHelper);
// Re-prepare the CustomStreamGrouping since the downstream tasks can change
physicalPlanHelper.prepareForCustomStreamGrouping();
// transfer potentially changed variables by topology code
physicalPlanHelper.getTopologyContext().getTopologyConfig()
.putAll(helper.getTopologyContext().getTopologyConfig());
// Reset the helper
helper = physicalPlanHelper;
}
@Override
public void persistState(String checkpointId) {
LOG.info("Persisting state for checkpoint: " + checkpointId);
if (!isTopologyStateful) {
throw new RuntimeException("Could not save a non-stateful topology's state");
}
// need to synchronize with other OutgoingTupleCollection operations
// so that topology emit, ack, fail are thread safe
collector.lock.lock();
try {
if (bolt instanceof IStatefulComponent) {
((IStatefulComponent) bolt).preSave(checkpointId);
}
if (bolt instanceof ITwoPhaseStatefulComponent) {
waitingForCheckpointSaved = true;
}
collector.sendOutState(instanceState, checkpointId, spillState, spillStateLocation);
} finally {
collector.lock.unlock();
}
LOG.info("State persisted for checkpoint: " + checkpointId);
}
@SuppressWarnings("unchecked")
@Override
public void init(State<Serializable, Serializable> state) {
TopologyContextImpl topologyContext = helper.getTopologyContext();
// Initialize the GlobalMetrics
GlobalMetrics.init(topologyContext, systemConfig.getHeronMetricsExportInterval());
boltMetrics.registerMetrics(topologyContext);
// Initialize the instanceState if the topology is stateful and bolt is a stateful component
if (isTopologyStateful && bolt instanceof IStatefulComponent) {
this.instanceState = state;
((IStatefulComponent<Serializable, Serializable>) bolt).initState(instanceState);
if (spillState) {
if (FileUtils.isDirectoryExists(spillStateLocation)) {
FileUtils.cleanDir(spillStateLocation);
} else {
FileUtils.createDirectory(spillStateLocation);
}
}
}
// Delegate
bolt.prepare(
topologyContext.getTopologyConfig(), topologyContext, new OutputCollector(collector));
// Invoke user-defined prepare task hook
topologyContext.invokeHookPrepare();
// Init the CustomStreamGrouping
helper.prepareForCustomStreamGrouping();
}
@Override
public void start() {
addBoltTasks();
}
@Override
public void preRestore(String checkpointId) {
if (bolt instanceof ITwoPhaseStatefulComponent) {
((ITwoPhaseStatefulComponent) bolt).preRestore(checkpointId);
}
}
@Override
public void onCheckpointSaved(String checkpointId) {
if (bolt instanceof ITwoPhaseStatefulComponent) {
((ITwoPhaseStatefulComponent) bolt).postSave(checkpointId);
waitingForCheckpointSaved = false;
}
}
@Override
public void clean() {
// Invoke clean up hook before clean() is called
helper.getTopologyContext().invokeHookCleanup();
// Delegate to user-defined clean-up method
bolt.cleanup();
// Clean the resources we own
streamInQueue.clear();
collector.clear();
}
@Override
public void shutdown() {
clean();
looper.exitLoop();
}
private void addBoltTasks() {
// add the readTupleAndExecute() to tasks
Runnable boltTasks = new Runnable() {
@Override
public void run() {
boltMetrics.updateTaskRunCount();
// Back-pressure -- only when we could send out tuples will we read & execute tuples
if (collector.isOutQueuesAvailable()) {
boltMetrics.updateExecutionCount();
readTuplesAndExecute(streamInQueue);
// Though we may execute MAX_READ tuples, finally we will packet it as
// one outgoingPacket and push to out queues
collector.sendOutTuples();
// Here we need to inform the Gateway
} else {
boltMetrics.updateOutQueueFullCount();
}
// If there are more to read, we will wake up itself next time when it doWait()
if (collector.isOutQueuesAvailable() && !streamInQueue.isEmpty()) {
boltMetrics.updateContinueWorkCount();
looper.wakeUp();
}
}
};
looper.addTasksOnWakeup(boltTasks);
PrepareTickTupleTimer();
InstanceUtils.prepareTimerEvents(looper, helper);
}
@Override
public void readTuplesAndExecute(Communicator<Message> inQueue) {
TopologyContextImpl topologyContext = helper.getTopologyContext();
Duration instanceExecuteBatchTime = systemConfig.getInstanceExecuteBatchTime();
long startOfCycle = System.nanoTime();
// Read data from in Queues
while (!inQueue.isEmpty() && !waitingForCheckpointSaved) {
Message msg = inQueue.poll();
if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
String checkpointId =
((CheckpointManager.InitiateStatefulCheckpoint) msg).getCheckpointId();
persistState(checkpointId);
}
if (msg instanceof HeronTuples.HeronTupleSet) {
HeronTuples.HeronTupleSet tuples = (HeronTuples.HeronTupleSet) msg;
// Handle the tuples
if (tuples.hasControl()) {
throw new RuntimeException("Bolt cannot get acks/fails from other components");
}
// Get meta data of tuples
TopologyAPI.StreamId stream = tuples.getData().getStream();
int nValues = topologyContext.getComponentOutputFields(
stream.getComponentName(), stream.getId()).size();
int sourceTaskId = tuples.getSrcTaskId();
for (HeronTuples.HeronDataTuple dataTuple : tuples.getData().getTuplesList()) {
long startTime = System.nanoTime();
// Create the value list and fill the value
List<Object> values = new ArrayList<>(nValues);
for (int i = 0; i < nValues; i++) {
values.add(serializer.deserialize(dataTuple.getValues(i).toByteArray()));
}
// Decode the tuple
TupleImpl t = new TupleImpl(topologyContext, stream, dataTuple.getKey(),
dataTuple.getRootsList(), values, System.nanoTime(), false, sourceTaskId);
long deserializedTime = System.nanoTime();
// Delegate to the use defined bolt
bolt.execute(t);
// record the latency of execution
long executeLatency = Duration.ofNanos(System.nanoTime()).
minusNanos(deserializedTime).toNanos();
// Invoke user-defined execute task hook
topologyContext.invokeHookBoltExecute(t, Duration.ofNanos(executeLatency));
// Update metrics
boltMetrics.deserializeDataTuple(stream.getId(), stream.getComponentName(),
deserializedTime - startTime);
boltMetrics.executeTuple(stream.getId(), stream.getComponentName(), executeLatency);
}
// To avoid spending too much time
long currentTime = System.nanoTime();
if (currentTime - startOfCycle - instanceExecuteBatchTime.toNanos() > 0) {
break;
}
}
}
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
private void PrepareTickTupleTimer() {
Object tickTupleFreqMs =
helper.getTopologyContext().getTopologyConfig().get(Config.TOPOLOGY_TICK_TUPLE_FREQ_MS);
if (tickTupleFreqMs != null) {
Duration freq = TypeUtils.getDuration(tickTupleFreqMs, ChronoUnit.MILLIS);
Runnable r = () -> SendTickTuple();
looper.registerPeriodicEvent(freq, r);
}
}
private void SendTickTuple() {
TickTuple t = new TickTuple();
long startTime = System.nanoTime();
bolt.execute(t);
long latency = System.nanoTime() - startTime;
boltMetrics.executeTuple(t.getSourceStreamId(), t.getSourceComponent(), latency);
collector.sendOutTuples();
}
}