blob: 6a5cee8c48dbbb7cd63cfc4a8eea770e6163b092 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.daemon;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.Thrift;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StateSpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.metric.EventLoggerBolt;
import org.apache.storm.metric.MetricsConsumerBolt;
import org.apache.storm.metric.SystemBolt;
import org.apache.storm.metric.filter.FilterByMetricName;
import org.apache.storm.metric.util.DataPointExpander;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.shade.org.json.simple.JSONValue;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedInvalidTopologyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StormCommon {
public static final String SYSTEM_STREAM_ID = "__system";
public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
public static final String TOPOLOGY_METRICS_CONSUMER_CLASS = "class";
public static final String TOPOLOGY_METRICS_CONSUMER_ARGUMENT = "argument";
public static final String TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES = "max.retain.metric.tuples";
public static final String TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT = "parallelism.hint";
public static final String TOPOLOGY_METRICS_CONSUMER_WHITELIST = "whitelist";
public static final String TOPOLOGY_METRICS_CONSUMER_BLACKLIST = "blacklist";
public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = "expandMapType";
public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR = "metricNameSeparator";
public static final String TOPOLOGY_EVENT_LOGGER_CLASS = "class";
public static final String TOPOLOGY_EVENT_LOGGER_ARGUMENTS = "arguments";
private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class);
// A singleton instance allows us to mock delegated static methods in our
// tests by subclassing.
private static StormCommon _instance = new StormCommon();
/**
* Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that
* overrides the implementation of the delegated method.
*
* @param common a StormCommon instance
* @return the previously set instance
*/
public static StormCommon setInstance(StormCommon common) {
StormCommon oldInstance = _instance;
_instance = common;
return oldInstance;
}
public static void validateDistributedMode(Map<String, Object> conf) {
if (ConfigUtils.isLocalMode(conf)) {
throw new IllegalArgumentException("Cannot start server in local mode!");
}
}
private static Set<String> validateIds(Map<String, ? extends Object> componentMap) throws InvalidTopologyException {
Set<String> keys = componentMap.keySet();
for (String id : keys) {
if (Utils.isSystemId(id)) {
throw new WrappedInvalidTopologyException(id + " is not a valid component id.");
}
}
for (Object componentObj : componentMap.values()) {
ComponentCommon common = getComponentCommon(componentObj);
Set<String> streamIds = common.get_streams().keySet();
for (String id : streamIds) {
if (Utils.isSystemId(id)) {
throw new WrappedInvalidTopologyException(id + " is not a valid stream id.");
}
}
}
return keys;
}
private static void validateIds(StormTopology topology) throws InvalidTopologyException {
List<String> componentIds = new ArrayList<>();
componentIds.addAll(validateIds(topology.get_bolts()));
componentIds.addAll(validateIds(topology.get_spouts()));
componentIds.addAll(validateIds(topology.get_state_spouts()));
List<String> offending = Utils.getRepeat(componentIds);
if (!offending.isEmpty()) {
throw new WrappedInvalidTopologyException("Duplicate component ids: " + offending);
}
}
private static boolean isEmptyInputs(ComponentCommon common) {
if (common.get_inputs() == null) {
return true;
} else {
return common.get_inputs().isEmpty();
}
}
public static Map<String, Object> allComponents(StormTopology topology) {
Map<String, Object> components = new HashMap<>(topology.get_bolts());
components.putAll(topology.get_spouts());
components.putAll(topology.get_state_spouts());
return components;
}
@SuppressWarnings("unchecked")
public static Map<String, Object> componentConf(Object component) {
try {
Map<String, Object> conf = new HashMap<>();
ComponentCommon common = getComponentCommon(component);
String jconf = common.get_json_conf();
if (jconf != null) {
conf.putAll((Map<String, Object>) JSONValue.parseWithException(jconf));
}
return conf;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
validateIds(topology);
for (StormTopology._Fields field : Thrift.getSpoutFields()) {
Map<String, Object> spoutComponents = (Map<String, Object>) topology.getFieldValue(field);
if (spoutComponents != null) {
for (Object obj : spoutComponents.values()) {
ComponentCommon common = getComponentCommon(obj);
if (!isEmptyInputs(common)) {
throw new WrappedInvalidTopologyException("May not declare inputs for a spout");
}
}
}
}
Map<String, Object> componentMap = allComponents(topology);
for (Object componentObj : componentMap.values()) {
Map<String, Object> conf = componentConf(componentObj);
ComponentCommon common = getComponentCommon(componentObj);
int parallelismHintNum = Thrift.getParallelismHint(common);
Integer taskNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
if (taskNum > 0 && parallelismHintNum <= 0) {
throw new WrappedInvalidTopologyException(
"Number of executors must be greater than 0 when number of tasks is greater than 0");
}
}
}
private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
Set<String> outputFields = new HashSet<>();
for (StreamInfo streamInfo : streams.values()) {
outputFields.addAll(streamInfo.get_output_fields());
}
return outputFields;
}
public static void validateStructure(StormTopology topology) throws InvalidTopologyException {
Map<String, Object> componentMap = allComponents(topology);
for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
String componentId = entry.getKey();
ComponentCommon common = getComponentCommon(entry.getValue());
Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
String sourceStreamId = input.getKey().get_streamId();
String sourceComponentId = input.getKey().get_componentId();
if (!componentMap.keySet().contains(sourceComponentId)) {
throw new WrappedInvalidTopologyException("Component: [" + componentId
+ "] subscribes from non-existent component [" + sourceComponentId + "]");
}
ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
if (!sourceComponent.get_streams().containsKey(sourceStreamId)) {
throw new WrappedInvalidTopologyException("Component: [" + componentId
+ "] subscribes from non-existent stream: "
+ "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
}
Grouping grouping = input.getValue();
if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
List<String> fields = new ArrayList<>(grouping.get_fields());
Map<String, StreamInfo> streams = sourceComponent.get_streams();
Set<String> sourceOutputFields = getStreamOutputFields(streams);
fields.removeAll(sourceOutputFields);
if (fields.size() != 0) {
throw new WrappedInvalidTopologyException("Component: [" + componentId
+ "] subscribes from stream: [" + sourceStreamId + "] of component "
+ "[" + sourceComponentId + "] + with non-existent fields: " + fields);
}
}
}
}
}
public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
Set<String> boltIds = topology.get_bolts().keySet();
Set<String> spoutIds = topology.get_spouts().keySet();
for (String id : spoutIds) {
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
}
for (String id : boltIds) {
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
}
return inputs;
}
public static IBolt makeAckerBolt() {
return _instance.makeAckerBoltImpl();
}
@SuppressWarnings("unchecked")
public static void addAcker(Map<String, Object> conf, StormTopology topology) {
Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
Map<String, Object> ackerConf = new HashMap<>();
int ackerNum =
ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
for (Bolt bolt : topology.get_bolts().values()) {
ComponentCommon common = bolt.get_common();
common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
}
for (SpoutSpec spout : topology.get_spouts().values()) {
ComponentCommon common = spout.get_common();
Map<String, Object> spoutConf = componentConf(spout);
spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
common.set_json_conf(JSONValue.toJSONString(spoutConf));
common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
Thrift.prepareDirectGrouping());
}
topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
}
public static ComponentCommon getComponentCommon(Object component) {
ComponentCommon common = null;
if (component instanceof StateSpoutSpec) {
common = ((StateSpoutSpec) component).get_common();
} else if (component instanceof SpoutSpec) {
common = ((SpoutSpec) component).get_common();
} else if (component instanceof Bolt) {
common = ((Bolt) component).get_common();
}
return common;
}
public static void addMetricStreams(StormTopology topology) {
for (Object component : allComponents(topology).values()) {
ComponentCommon common = getComponentCommon(component);
StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
}
}
public static void addSystemStreams(StormTopology topology) {
for (Object component : allComponents(topology).values()) {
ComponentCommon common = getComponentCommon(component);
StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
}
}
public static List<String> eventLoggerBoltFields() {
return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
}
public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
Set<String> allIds = new HashSet<String>();
allIds.addAll(topology.get_bolts().keySet());
allIds.addAll(topology.get_spouts().keySet());
for (String id : allIds) {
inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
}
return inputs;
}
public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
if (numExecutors == null || numExecutors == 0) {
return;
}
HashMap<String, Object> componentConf = new HashMap<>();
componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
for (Object component : allComponents(topology).values()) {
ComponentCommon common = getComponentCommon(component);
common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
}
topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
}
@SuppressWarnings("unchecked")
public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {
Map<String, Bolt> metricsConsumerBolts = new HashMap<>();
Set<String> componentIdsEmitMetrics = new HashSet<>();
componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
for (String componentId : componentIdsEmitMetrics) {
inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
}
List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
if (registerInfo != null) {
Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
for (Map<String, Object> info : registerInfo) {
String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
List<String> whitelist = (List<String>) info.get(
TOPOLOGY_METRICS_CONSUMER_WHITELIST);
List<String> blacklist = (List<String>) info.get(
TOPOLOGY_METRICS_CONSUMER_BLACKLIST);
FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
Boolean expandMapType = ObjectReader.getBoolean(info.get(
TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);
String metricNameSeparator = ObjectReader.getString(info.get(
TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");
DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,
maxRetainMetricTuples, filterPredicate, expander);
Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
boltInstance, null, phintNum, metricsConsumerConf);
String id = className;
if (classOccurrencesMap.containsKey(className)) {
// e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
int occurrenceNum = classOccurrencesMap.get(className);
occurrenceNum++;
classOccurrencesMap.put(className, occurrenceNum);
id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
} else {
id = Constants.METRICS_COMPONENT_ID_PREFIX + className;
classOccurrencesMap.put(className, 1);
}
metricsConsumerBolts.put(id, metricsConsumerBolt);
}
}
return metricsConsumerBolts;
}
public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {
Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
topology.put_to_bolts(entry.getKey(), entry.getValue());
}
}
@SuppressWarnings("unused")
public static void addSystemComponents(Map<String, Object> conf, StormTopology topology) {
Map<String, StreamInfo> outputStreams = new HashMap<>();
outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
outputStreams.put(Constants.SYSTEM_FLUSH_STREAM_ID, Thrift.outputFields(Arrays.asList()));
outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
Map<String, Object> boltConf = new HashMap<>();
boltConf.put(Config.TOPOLOGY_TASKS, 0);
Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, systemBoltSpec);
}
public static StormTopology systemTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
return _instance.systemTopologyImpl(topoConf, topology);
}
public static boolean hasAckers(Map<String, Object> topoConf) {
Object ackerNum = topoConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
return ackerNum == null || ObjectReader.getInt(ackerNum) > 0;
}
public static boolean hasEventLoggers(Map<String, Object> topoConf) {
Object eventLoggerNum = topoConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
return eventLoggerNum == null || ObjectReader.getInt(eventLoggerNum) > 0;
}
public static int numStartExecutors(Object component) throws InvalidTopologyException {
ComponentCommon common = getComponentCommon(component);
return Thrift.getParallelismHint(common);
}
public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map<String, Object> topoConf) throws
InvalidTopologyException {
return _instance.stormTaskInfoImpl(userTopology, topoConf);
}
public static List<Integer> executorIdToTasks(List<Long> executorId) {
List<Integer> taskIds = new ArrayList<>();
int taskId = executorId.get(0).intValue();
while (taskId <= executorId.get(1).intValue()) {
taskIds.add(taskId);
taskId++;
}
return taskIds;
}
public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodePort) {
Map<Integer, NodeInfo> tasksToNodePort = new HashMap<>();
for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
List<Integer> taskIds = executorIdToTasks(entry.getKey());
for (Integer taskId : taskIds) {
tasksToNodePort.put(taskId, entry.getValue());
}
}
return tasksToNodePort;
}
public static IAuthorizer mkAuthorizationHandler(String klassName, Map<String, Object> conf)
throws IllegalAccessException, InstantiationException, ClassNotFoundException {
return _instance.mkAuthorizationHandlerImpl(klassName, conf);
}
@SuppressWarnings("unchecked")
public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) {
try {
StormTopology stormTopology = (StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY);
Map<String, Object> topoConf = (Map) workerData.get(Constants.STORM_CONF);
Map<Integer, String> taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT);
Map<String, List<Integer>> componentToSortedTasks =
(Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS);
Map<String, Map<String, Fields>> componentToStreamToFields =
(Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS);
String stormId = (String) workerData.get(Constants.STORM_ID);
Map<String, Object> conf = (Map) workerData.get(Constants.CONF);
Integer port = (Integer) workerData.get(Constants.PORT);
String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, stormId));
String pidDir = ConfigUtils.workerPidsRoot(conf, stormId);
List<Integer> workerTasks = (List<Integer>) workerData.get(Constants.TASK_IDS);
Map<String, Object> defaultResources = (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES);
Map<String, Object> userResources = (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES);
return new WorkerTopologyContext(stormTopology, topoConf, taskToComponent, componentToSortedTasks,
componentToStreamToFields, stormId, codeDir, pidDir, port, workerTasks, defaultResources,
userResources);
} catch (IOException e) {
throw Utils.wrapInRuntime(e);
}
}
public IBolt makeAckerBoltImpl() {
return new Acker();
}
protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
validateBasic(topology);
StormTopology ret = topology.deepCopy();
addAcker(topoConf, ret);
if (hasEventLoggers(topoConf)) {
addEventLogger(topoConf, ret);
}
addMetricComponents(topoConf, ret);
addSystemComponents(topoConf, ret);
addMetricStreams(ret);
addSystemStreams(ret);
validateStructure(ret);
return ret;
}
/*
* Returns map from task -> componentId
*/
protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map<String, Object> topoConf) throws
InvalidTopologyException {
Map<Integer, String> taskIdToComponentId = new HashMap<>();
StormTopology systemTopology = systemTopology(topoConf, userTopology);
Map<String, Object> components = allComponents(systemTopology);
Map<String, Integer> componentIdToTaskNum = new TreeMap<>();
for (Map.Entry<String, Object> entry : components.entrySet()) {
Map<String, Object> conf = componentConf(entry.getValue());
Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
componentIdToTaskNum.put(entry.getKey(), ObjectReader.getInt(taskNum));
}
int taskId = 1;
for (Map.Entry<String, Integer> entry : componentIdToTaskNum.entrySet()) {
String componentId = entry.getKey();
Integer taskNum = entry.getValue();
while (taskNum > 0) {
taskIdToComponentId.put(taskId, componentId);
taskNum--;
taskId++;
}
}
return taskIdToComponentId;
}
protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map<String, Object> conf)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
IAuthorizer aznHandler = null;
if (StringUtils.isNotBlank(klassName)) {
Class<?> aznClass = Class.forName(klassName);
if (aznClass != null) {
aznHandler = (IAuthorizer) aznClass.newInstance();
if (aznHandler != null) {
aznHandler.prepare(conf);
}
LOG.debug("authorization class name:{}, class:{}, handler:{}", klassName, aznClass, aznHandler);
}
}
return aznHandler;
}
}