blob: 7fa892f74aa4fae7417bd93b21e0f192a3f3350e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.daemon;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Random;
import java.util.function.BooleanSupplier;
import org.apache.storm.Config;
import org.apache.storm.Thrift;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.ExecutorTransfer;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentObject;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.JavaObject;
import org.apache.storm.generated.ShellComponent;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StateSpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.hooks.ITaskHook;
import org.apache.storm.hooks.info.EmitInfo;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.metrics2.TaskMetrics;
import org.apache.storm.spout.ShellSpout;
import org.apache.storm.stats.CommonStats;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private final TaskMetrics taskMetrics;
private final Executor executor;
private final WorkerState workerData;
private final TopologyContext systemTopologyContext;
private final TopologyContext userTopologyContext;
private final WorkerTopologyContext workerTopologyContext;
private final Integer taskId;
private final String componentId;
private final Object taskObject; // Spout/Bolt object
private final Map<String, Object> topoConf;
private final BooleanSupplier emitSampler;
private final CommonStats executorStats;
private final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
private final HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers;
private final boolean debug;
public Task(Executor executor, Integer taskId) throws IOException {
this.taskId = taskId;
this.executor = executor;
this.workerData = executor.getWorkerData();
this.topoConf = executor.getTopoConf();
this.componentId = executor.getComponentId();
this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
this.streamToGroupers = getGroupersPerStream(streamComponentToGrouper);
this.executorStats = executor.getStats();
this.workerTopologyContext = executor.getWorkerTopologyContext();
this.emitSampler = ConfigUtils.mkStatsSampler(topoConf);
this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
this.userTopologyContext = mkTopologyContext(workerData.getTopology());
this.taskObject = mkTaskObject();
this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG);
this.addTaskHooks();
this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId, workerData.getMetricRegistry());
}
private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream(
Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper) {
HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> result = new HashMap<>(streamComponentToGrouper.size());
for (Entry<String, Map<String, LoadAwareCustomStreamGrouping>> entry : streamComponentToGrouper.entrySet()) {
String stream = entry.getKey();
Map<String, LoadAwareCustomStreamGrouping> groupers = entry.getValue();
ArrayList<LoadAwareCustomStreamGrouping> perStreamGroupers = new ArrayList<>();
if (groupers != null) { // null for __system bolt
for (LoadAwareCustomStreamGrouping grouper : groupers.values()) {
perStreamGroupers.add(grouper);
}
}
result.put(stream, perStreamGroupers);
}
return result;
}
public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Object> values) {
if (debug) {
LOG.info("Emitting direct: {}; {} {} {} ", outTaskId, componentId, stream, values);
}
String targetComponent = workerTopologyContext.getComponentId(outTaskId);
Map<String, LoadAwareCustomStreamGrouping> componentGrouping = streamComponentToGrouper.get(stream);
LoadAwareCustomStreamGrouping grouping = componentGrouping.get(targetComponent);
if (null == grouping) {
outTaskId = null;
}
if (grouping != null && grouping != GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
}
if (!userTopologyContext.getHooks().isEmpty()) {
new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
}
try {
if (emitSampler.getAsBoolean()) {
executorStats.emittedTuple(stream, this.taskMetrics.getEmitted(stream));
if (null != outTaskId) {
executorStats.transferredTuples(stream, 1, this.taskMetrics.getTransferred(stream));
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
if (null != outTaskId) {
return Collections.singletonList(outTaskId);
}
return new ArrayList<>(0);
}
public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
if (debug) {
LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values);
}
ArrayList<Integer> outTasks = new ArrayList<>();
ArrayList<LoadAwareCustomStreamGrouping> groupers = streamToGroupers.get(stream);
if (null != groupers) {
for (int i = 0; i < groupers.size(); ++i) {
LoadAwareCustomStreamGrouping grouper = groupers.get(i);
if (grouper == GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot do regular emit to direct stream");
}
List<Integer> compTasks = grouper.chooseTasks(taskId, values);
outTasks.addAll(compTasks);
}
} else {
throw new IllegalArgumentException("Unknown stream ID: " + stream);
}
if (!userTopologyContext.getHooks().isEmpty()) {
new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
}
try {
if (emitSampler.getAsBoolean()) {
executorStats.emittedTuple(stream, this.taskMetrics.getEmitted(stream));
executorStats.transferredTuples(stream, outTasks.size(), this.taskMetrics.getTransferred(stream));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return outTasks;
}
public Tuple getTuple(String stream, List values) {
return new TupleImpl(systemTopologyContext, values, executor.getComponentId(), systemTopologyContext.getThisTaskId(), stream);
}
public Integer getTaskId() {
return taskId;
}
public String getComponentId() {
return componentId;
}
public TopologyContext getUserContext() {
return userTopologyContext;
}
public Object getTaskObject() {
return taskObject;
}
public TaskMetrics getTaskMetrics() {
return taskMetrics;
}
// Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
Tuple tuple = getTuple(stream, values);
List<Integer> tasks = getOutgoingTasks(stream, values);
for (Integer t : tasks) {
AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
transfer.tryTransfer(addressedTuple, pendingEmits);
}
}
/**
* Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
*/
public void sendToEventLogger(Executor executor, List values,
String componentId, Object messageId, Random random, Queue<AddressedTuple> overflow) {
Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
DebugOptions debugOptions = componentDebug.get(componentId);
if (debugOptions == null) {
debugOptions = componentDebug.get(executor.getStormId());
}
double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
if (spct > 0 && (random.nextDouble() * 100) < spct) {
sendUnanchored(StormCommon.EVENTLOGGER_STREAM_ID,
new Values(componentId, messageId, System.currentTimeMillis(), values),
executor.getExecutorTransfer(), overflow);
}
}
private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
Map<String, Object> conf = workerData.getConf();
TopologyContext topologyContext = new TopologyContext(
topology,
workerData.getTopologyConf(),
workerData.getTaskToComponent(),
workerData.getComponentToSortedTasks(),
workerData.getComponentToStreamToFields(),
// This is updated by the Worker and the topology has shared access to it
workerData.getBlobToLastKnownVersion(),
workerData.getTopologyId(),
ConfigUtils.supervisorStormResourcesPath(
ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
taskId,
workerData.getPort(), workerData.getLocalTaskIds(),
workerData.getDefaultSharedResources(),
workerData.getUserSharedResources(),
executor.getSharedExecutorData(),
executor.getIntervalToTaskToMetricToRegistry(),
executor.getOpenOrPrepareWasCalled(),
workerData.getMetricRegistry());
executor.setMetricRegistry(workerData.getMetricRegistry());
return topologyContext;
}
private Object mkTaskObject() {
StormTopology topology = systemTopologyContext.getRawTopology();
Map<String, SpoutSpec> spouts = topology.get_spouts();
Map<String, Bolt> bolts = topology.get_bolts();
Map<String, StateSpoutSpec> stateSpouts = topology.get_state_spouts();
Object result;
ComponentObject componentObject;
if (spouts.containsKey(componentId)) {
componentObject = spouts.get(componentId).get_spout_object();
} else if (bolts.containsKey(componentId)) {
componentObject = bolts.get(componentId).get_bolt_object();
} else if (stateSpouts.containsKey(componentId)) {
componentObject = stateSpouts.get(componentId).get_state_spout_object();
} else {
throw new RuntimeException("Could not find " + componentId + " in " + topology);
}
result = Utils.getSetComponentObject(componentObject);
if (result instanceof ShellComponent) {
if (spouts.containsKey(componentId)) {
result = new ShellSpout((ShellComponent) result);
} else {
result = new ShellBolt((ShellComponent) result);
}
}
if (result instanceof JavaObject) {
result = Thrift.instantiateJavaObject((JavaObject) result);
}
return result;
}
private void addTaskHooks() {
List<String> hooksClassList = (List<String>) topoConf.get(Config.TOPOLOGY_AUTO_TASK_HOOKS);
if (null != hooksClassList) {
for (String hookClass : hooksClassList) {
try {
userTopologyContext.addTaskHook(((ITaskHook) Class.forName(hookClass).newInstance()));
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new RuntimeException("Failed to add hook: " + hookClass, e);
}
}
}
}
@Override
public String toString() {
return taskId.toString();
}
}