blob: 48528d7178d0fb8cca30f174671665b122d1fc3c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.cluster;
import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.generated.*;
import backtype.storm.metric.SystemBolt;
import backtype.storm.spout.ShellSpout;
import backtype.storm.task.IBolt;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.ThriftTopologyUtils;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyAssignContext;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.acker.Acker;
import com.alibaba.jstorm.task.group.MkGrouper;
import com.alibaba.jstorm.task.master.TopologyMaster;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.Thrift;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URLClassLoader;
import java.security.InvalidParameterException;
import java.util.*;
import java.util.Map.Entry;
/**
* Base utility function
*
* 1. base topology validation 2. add streams/inputs
*
* @author yannian/Longda
*
*/
public class Common {
private final static Logger LOG = LoggerFactory.getLogger(Common.class);
public static final String TOPOLOGY_MASTER_COMPONENT_ID = "__topology_master";
public static final String TOPOLOGY_MASTER_HB_STREAM_ID = "__master_task_heartbeat";
public static final String TOPOLOGY_MASTER_METRICS_STREAM_ID = "__master_metrics";
public static final String TOPOLOGY_MASTER_CONTROL_STREAM_ID = "__master_control_stream";
public static final String ACKER_COMPONENT_ID = Acker.ACKER_COMPONENT_ID;
public static final String ACKER_INIT_STREAM_ID = Acker.ACKER_INIT_STREAM_ID;
public static final String ACKER_ACK_STREAM_ID = Acker.ACKER_ACK_STREAM_ID;
public static final String ACKER_FAIL_STREAM_ID = Acker.ACKER_FAIL_STREAM_ID;
public static final String SYSTEM_STREAM_ID = "__system";
public static final String LS_WORKER_HEARTBEAT = "worker-heartbeat";
public static final String LS_ID = "supervisor-id";
public static final String LS_LOCAL_ASSIGNMENTS = "local-assignments";
public static final String LS_APPROVED_WORKERS = "approved-workers";
public static final String LS_TASK_CLEANUP_TIMEOUT = "task-cleanup-timeout";
public static final String compErrorInfo = "ID can only contains a-z, A-Z, 0-9, '-', '_', '.', '$', and should not start with \"__\".";
public static final String nameErrorInfo = "Name can only contains a-z, A-Z, 0-9, '-', '_', '.'";
public static boolean system_id(String id) {
return Utils.isSystemId(id);
}
private static void validate_component(Object obj) throws InvalidTopologyException {
if (obj instanceof StateSpoutSpec) {
StateSpoutSpec spec = (StateSpoutSpec) obj;
for (String id : spec.get_common().get_streams().keySet()) {
if (system_id(id) || !charComponentValidate(id)) {
throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
}
}
} else if (obj instanceof SpoutSpec) {
SpoutSpec spec = (SpoutSpec) obj;
for (String id : spec.get_common().get_streams().keySet()) {
if (system_id(id) || !charComponentValidate(id)) {
throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
}
}
} else if (obj instanceof Bolt) {
Bolt spec = (Bolt) obj;
for (String id : spec.get_common().get_streams().keySet()) {
if (system_id(id) || !charComponentValidate(id)) {
throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
}
}
} else {
throw new InvalidTopologyException("Unknow type component");
}
}
public static String topologyNameToId(String topologyName, int counter) {
return topologyName + "-" + counter + "-" + TimeUtils.current_time_secs();
}
public static String getTopologyNameById(String topologyId) {
String topologyName = null;
try {
topologyName = topologyIdToName(topologyId);
} catch (InvalidTopologyException e) {
LOG.error("Invalid topologyId=" + topologyId);
}
return topologyName;
}
/**
* Convert topologyId to topologyName. TopologyId = topoloygName-counter-timeStamp
*
* @param topologyId
* @return
*/
public static String topologyIdToName(String topologyId) throws InvalidTopologyException {
String ret = null;
int index = topologyId.lastIndexOf('-');
if (index != -1 && index > 2) {
index = topologyId.lastIndexOf('-', index - 1);
if (index != -1 && index > 0)
ret = topologyId.substring(0, index);
else
throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
} else
throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
return ret;
}
/**
* Validation of topology name chars. Only alpha char, number, '-', '_', '.' are valid.
*
* @return
*/
public static boolean charValidate(String name) {
return name.matches("[a-zA-Z0-9-_.]+");
}
/**
* Validation of topology component chars. Only alpha char, number, '-', '_', '.', '$' are valid.
*
* @return
*/
public static boolean charComponentValidate(String name) {
return name.matches("[a-zA-Z0-9-_/.$]+");
}
/**
* Check Whether ID of Bolt or spout is system_id
*
* @param topology
* @throws InvalidTopologyException
*/
@SuppressWarnings("unchecked")
public static void validate_ids(StormTopology topology, String topologyId) throws InvalidTopologyException {
String topologyName = topologyIdToName(topologyId);
if (!charValidate(topologyName)) {
throw new InvalidTopologyException(topologyName + " is not a valid topology name. " + nameErrorInfo);
}
List<String> list = new ArrayList<String>();
for (StormTopology._Fields field : Thrift.STORM_TOPOLOGY_FIELDS) {
Object value = topology.getFieldValue(field);
if (value != null) {
Map<String, Object> obj_map = (Map<String, Object>) value;
Set<String> commids = obj_map.keySet();
for (String id : commids) {
if (system_id(id) || !charComponentValidate(id)) {
throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
}
}
for (Object obj : obj_map.values()) {
validate_component(obj);
}
list.addAll(commids);
}
}
List<String> offending = JStormUtils.getRepeat(list);
if (offending.isEmpty() == false) {
throw new InvalidTopologyException("Duplicate component ids: " + offending);
}
}
private static void validate_component_inputs(Object obj) throws InvalidTopologyException {
if (obj instanceof StateSpoutSpec) {
StateSpoutSpec spec = (StateSpoutSpec) obj;
if (!spec.get_common().get_inputs().isEmpty()) {
throw new InvalidTopologyException("May not declare inputs for a spout");
}
}
if (obj instanceof SpoutSpec) {
SpoutSpec spec = (SpoutSpec) obj;
if (!spec.get_common().get_inputs().isEmpty()) {
throw new InvalidTopologyException("May not declare inputs for a spout");
}
}
}
/**
* Validate the topology 1. component id name is valid or not 2. check some spout's input is empty or not
*
* @param topology
* @throws InvalidTopologyException
*/
public static void validate_basic(StormTopology topology, Map<Object, Object> totalStormConf, String topologyid) throws InvalidTopologyException {
validate_ids(topology, topologyid);
for (StormTopology._Fields field : Thrift.SPOUT_FIELDS) {
Object value = topology.getFieldValue(field);
if (value != null) {
Map<String, Object> obj_map = (Map<String, Object>) value;
for (Object obj : obj_map.values()) {
validate_component_inputs(obj);
}
}
}
Integer workerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_WORKERS));
if (workerNum == null || workerNum <= 0) {
String errMsg = "There are no Config.TOPOLOGY_WORKERS in configuration of " + topologyid;
throw new InvalidParameterException(errMsg);
}
Integer ackerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
if (ackerNum != null && ackerNum < 0) {
String errMsg = "Invalide Config.TOPOLOGY_ACKERS in configuration of " + topologyid;
throw new InvalidParameterException(errMsg);
}
}
/**
* Generate acker's input Map<GlobalStreamId, Grouping>
*
* for spout <GlobalStreamId(spoutId, ACKER_INIT_STREAM_ID), ...> for bolt <GlobalStreamId(boltId, ACKER_ACK_STREAM_ID), ...> <GlobalStreamId(boltId,
* ACKER_FAIL_STREAM_ID), ...>
*
* @param topology
* @return
*/
public static Map<GlobalStreamId, Grouping> topoMasterInputs(StormTopology topology) {
GlobalStreamId stream = null;
Grouping group = null;
Map<GlobalStreamId, Grouping> spout_inputs = new HashMap<GlobalStreamId, Grouping>();
Map<String, SpoutSpec> spout_ids = topology.get_spouts();
for (Entry<String, SpoutSpec> spout : spout_ids.entrySet()) {
String id = spout.getKey();
stream = new GlobalStreamId(id, TOPOLOGY_MASTER_HB_STREAM_ID);
group = Thrift.mkAllGrouping();
spout_inputs.put(stream, group);
stream = new GlobalStreamId(id, TOPOLOGY_MASTER_METRICS_STREAM_ID);
group = Thrift.mkAllGrouping();
spout_inputs.put(stream, group);
stream = new GlobalStreamId(id, TOPOLOGY_MASTER_CONTROL_STREAM_ID);
group = Thrift.mkAllGrouping();
spout_inputs.put(stream, group);
}
Map<String, Bolt> bolt_ids = topology.get_bolts();
Map<GlobalStreamId, Grouping> bolt_inputs = new HashMap<GlobalStreamId, Grouping>();
for (Entry<String, Bolt> bolt : bolt_ids.entrySet()) {
String id = bolt.getKey();
stream = new GlobalStreamId(id, TOPOLOGY_MASTER_HB_STREAM_ID);
group = Thrift.mkAllGrouping();
bolt_inputs.put(stream, group);
stream = new GlobalStreamId(id, TOPOLOGY_MASTER_METRICS_STREAM_ID);
group = Thrift.mkAllGrouping();
bolt_inputs.put(stream, group);
stream = new GlobalStreamId(id, TOPOLOGY_MASTER_CONTROL_STREAM_ID);
group = Thrift.mkAllGrouping();
bolt_inputs.put(stream, group);
}
Map<GlobalStreamId, Grouping> himself_inputs = new HashMap<GlobalStreamId, Grouping>();
stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_HB_STREAM_ID);
group = Thrift.mkAllGrouping();
himself_inputs.put(stream, group);
stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_METRICS_STREAM_ID);
group = Thrift.mkAllGrouping();
himself_inputs.put(stream, group);
Map<GlobalStreamId, Grouping> allInputs = new HashMap<GlobalStreamId, Grouping>();
allInputs.putAll(bolt_inputs);
allInputs.putAll(spout_inputs);
allInputs.putAll(himself_inputs);
return allInputs;
}
/**
* Add topology master bolt to topology
*/
public static void addTopologyMaster(Map stormConf, StormTopology ret) {
// generate outputs
HashMap<String, StreamInfo> outputs = new HashMap<String, StreamInfo>();
List<String> list = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT);
outputs.put(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.outputFields(list));
list = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS);
outputs.put(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.outputFields(list));
list = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT);
outputs.put(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.outputFields(list));
IBolt topologyMaster = new TopologyMaster();
// generate inputs
Map<GlobalStreamId, Grouping> inputs = topoMasterInputs(ret);
// generate topology master which will be stored in topology
Bolt topologyMasterBolt = Thrift.mkBolt(inputs, topologyMaster, outputs, 1);
// add output stream to spout/bolt
for (Entry<String, Bolt> e : ret.get_bolts().entrySet()) {
Bolt bolt = e.getValue();
ComponentCommon common = bolt.get_common();
List<String> fields = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS);
common.put_to_streams(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.directOutputFields(fields));
fields = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT);
common.put_to_streams(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.directOutputFields(fields));
fields = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT);
common.put_to_streams(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.directOutputFields(fields));
GlobalStreamId stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_CONTROL_STREAM_ID);
common.put_to_inputs(stream, Thrift.mkDirectGrouping());
bolt.set_common(common);
}
for (Entry<String, SpoutSpec> kv : ret.get_spouts().entrySet()) {
SpoutSpec spout = kv.getValue();
ComponentCommon common = spout.get_common();
List<String> fields = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS);
common.put_to_streams(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.directOutputFields(fields));
fields = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT);
common.put_to_streams(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.directOutputFields(fields));
fields = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT);
common.put_to_streams(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.directOutputFields(fields));
GlobalStreamId stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_CONTROL_STREAM_ID);
common.put_to_inputs(stream, Thrift.mkDirectGrouping());
spout.set_common(common);
}
ret.put_to_bolts(TOPOLOGY_MASTER_COMPONENT_ID, topologyMasterBolt);
}
/**
* Generate acker's input Map<GlobalStreamId, Grouping>
*
* for spout <GlobalStreamId(spoutId, ACKER_INIT_STREAM_ID), ...> for bolt <GlobalStreamId(boltId, ACKER_ACK_STREAM_ID), ...> <GlobalStreamId(boltId,
* ACKER_FAIL_STREAM_ID), ...>
*
* @param topology
* @return
*/
public static Map<GlobalStreamId, Grouping> acker_inputs(StormTopology topology) {
Map<GlobalStreamId, Grouping> spout_inputs = new HashMap<GlobalStreamId, Grouping>();
Map<String, SpoutSpec> spout_ids = topology.get_spouts();
for (Entry<String, SpoutSpec> spout : spout_ids.entrySet()) {
String id = spout.getKey();
GlobalStreamId stream = new GlobalStreamId(id, ACKER_INIT_STREAM_ID);
Grouping group = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id"));
spout_inputs.put(stream, group);
}
Map<String, Bolt> bolt_ids = topology.get_bolts();
Map<GlobalStreamId, Grouping> bolt_inputs = new HashMap<GlobalStreamId, Grouping>();
for (Entry<String, Bolt> bolt : bolt_ids.entrySet()) {
String id = bolt.getKey();
GlobalStreamId streamAck = new GlobalStreamId(id, ACKER_ACK_STREAM_ID);
Grouping groupAck = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id"));
GlobalStreamId streamFail = new GlobalStreamId(id, ACKER_FAIL_STREAM_ID);
Grouping groupFail = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id"));
bolt_inputs.put(streamAck, groupAck);
bolt_inputs.put(streamFail, groupFail);
}
Map<GlobalStreamId, Grouping> allInputs = new HashMap<GlobalStreamId, Grouping>();
allInputs.putAll(bolt_inputs);
allInputs.putAll(spout_inputs);
return allInputs;
}
/**
* Add acker bolt to topology
*
* @param num_tasks
* @param ret
*/
public static void add_acker(Map stormConf, StormTopology ret) {
String key = Config.TOPOLOGY_ACKER_EXECUTORS;
Integer ackerNum = JStormUtils.parseInt(stormConf.get(key), 0);
// generate outputs
HashMap<String, StreamInfo> outputs = new HashMap<String, StreamInfo>();
ArrayList<String> fields = new ArrayList<String>();
fields.add("id");
outputs.put(ACKER_ACK_STREAM_ID, Thrift.directOutputFields(fields));
outputs.put(ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(fields));
IBolt ackerbolt = new Acker();
// generate inputs
Map<GlobalStreamId, Grouping> inputs = acker_inputs(ret);
// generate acker which will be stored in topology
Bolt acker_bolt = Thrift.mkBolt(inputs, ackerbolt, outputs, ackerNum);
// add every bolt two output stream
// ACKER_ACK_STREAM_ID/ACKER_FAIL_STREAM_ID
for (Entry<String, Bolt> e : ret.get_bolts().entrySet()) {
Bolt bolt = e.getValue();
ComponentCommon common = bolt.get_common();
List<String> ackList = JStormUtils.mk_list("id", "ack-val");
common.put_to_streams(ACKER_ACK_STREAM_ID, Thrift.outputFields(ackList));
List<String> failList = JStormUtils.mk_list("id");
common.put_to_streams(ACKER_FAIL_STREAM_ID, Thrift.outputFields(failList));
bolt.set_common(common);
}
// add every spout output stream ACKER_INIT_STREAM_ID
// add every spout two intput source
// ((ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID), directGrouping)
// ((ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID), directGrouping)
for (Entry<String, SpoutSpec> kv : ret.get_spouts().entrySet()) {
SpoutSpec bolt = kv.getValue();
ComponentCommon common = bolt.get_common();
List<String> initList = JStormUtils.mk_list("id", "init-val", "spout-task");
common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(initList));
GlobalStreamId ack_ack = new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID);
common.put_to_inputs(ack_ack, Thrift.mkDirectGrouping());
GlobalStreamId ack_fail = new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID);
common.put_to_inputs(ack_fail, Thrift.mkDirectGrouping());
}
ret.put_to_bolts(ACKER_COMPONENT_ID, acker_bolt);
}
public static List<Object> all_components(StormTopology topology) {
List<Object> rtn = new ArrayList<Object>();
for (StormTopology._Fields field : Thrift.STORM_TOPOLOGY_FIELDS) {
Object fields = topology.getFieldValue(field);
if (fields != null) {
rtn.addAll(((Map) fields).values());
}
}
return rtn;
}
private static List<String> sysEventFields = JStormUtils.mk_list("event");
private static void add_component_system_streams(Object obj) {
ComponentCommon common = null;
if (obj instanceof StateSpoutSpec) {
StateSpoutSpec spec = (StateSpoutSpec) obj;
common = spec.get_common();
}
if (obj instanceof SpoutSpec) {
SpoutSpec spec = (SpoutSpec) obj;
common = spec.get_common();
}
if (obj instanceof Bolt) {
Bolt spec = (Bolt) obj;
common = spec.get_common();
}
if (common != null) {
StreamInfo sinfo = Thrift.outputFields(sysEventFields);
common.put_to_streams(SYSTEM_STREAM_ID, sinfo);
}
}
/**
* Add every bolt or spout one output stream <SYSTEM_STREAM_ID, >
*
* @param topology
*/
public static void add_system_streams(StormTopology topology) {
for (Object obj : all_components(topology)) {
add_component_system_streams(obj);
}
}
public static StormTopology add_system_components(StormTopology topology) {
// generate inputs
Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
// generate outputs
HashMap<String, StreamInfo> outputs = new HashMap<String, StreamInfo>();
ArrayList<String> fields = new ArrayList<String>();
outputs.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("rate_secs")));
outputs.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("interval")));
outputs.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("creds")));
// ComponentCommon common = new ComponentCommon(inputs, outputs);
IBolt ackerbolt = new SystemBolt();
Bolt bolt = Thrift.mkBolt(inputs, ackerbolt, outputs, Integer.valueOf(0));
topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, bolt);
add_system_streams(topology);
return topology;
}
public static StormTopology add_metrics_component(StormTopology topology) {
/**
* @@@ TODO Add metrics consumer bolt
*/
// (defn metrics-consumer-bolt-specs [storm-conf topology]
// (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys
// (all-components topology)))
// inputs (->> (for [comp-id component-ids-that-emit-metrics]
// {[comp-id METRICS-STREAM-ID] :shuffle})
// (into {}))
//
// mk-bolt-spec (fn [class arg p]
// (thrift/mk-bolt-spec*
// inputs
// (backtype.storm.metric.MetricsConsumerBolt. class arg)
// {} :p p :conf {TOPOLOGY-TASKS p}))]
//
// (map
// (fn [component-id register]
// [component-id (mk-bolt-spec (get register "class")
// (get register "argument")
// (or (get register "parallelism.hint") 1))])
//
// (metrics-consumer-register-ids storm-conf)
// (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
return topology;
}
@SuppressWarnings("rawtypes")
public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException {
StormTopology ret = topology.deepCopy();
add_acker(storm_conf, ret);
if(StormConfig.local_mode(storm_conf) == false)
addTopologyMaster(storm_conf, ret);
add_metrics_component(ret);
add_system_components(ret);
return ret;
}
/**
* get component configuration
*
* @param storm_conf
* @param topology_context
* @param component_id
* @return
*/
@SuppressWarnings("unchecked")
public static Map component_conf(Map storm_conf, TopologyContext topology_context, String component_id) {
List<Object> to_remove = StormConfig.All_CONFIGS();
to_remove.remove(Config.TOPOLOGY_DEBUG);
to_remove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
to_remove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
to_remove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
Map<Object, Object> componentConf = new HashMap<Object, Object>();
String jconf = topology_context.getComponentCommon(component_id).get_json_conf();
if (jconf != null) {
componentConf = (Map<Object, Object>) JStormUtils.from_json(jconf);
}
/**
* @@@ Don't know why need remove system configuration from component conf? //
*/
// for (Object p : to_remove) {
// componentConf.remove(p);
// }
Map<Object, Object> ret = new HashMap<Object, Object>();
ret.putAll(storm_conf);
ret.putAll(componentConf);
return ret;
}
/**
* get object of component_id
*
* @param topology
* @param component_id
* @return
*/
public static Object get_task_object(StormTopology topology, String component_id, URLClassLoader loader) {
Map<String, SpoutSpec> spouts = topology.get_spouts();
Map<String, Bolt> bolts = topology.get_bolts();
Map<String, StateSpoutSpec> state_spouts = topology.get_state_spouts();
ComponentObject obj = null;
if (spouts.containsKey(component_id)) {
obj = spouts.get(component_id).get_spout_object();
} else if (bolts.containsKey(component_id)) {
obj = bolts.get(component_id).get_bolt_object();
} else if (state_spouts.containsKey(component_id)) {
obj = state_spouts.get(component_id).get_state_spout_object();
}
if (obj == null) {
throw new RuntimeException("Could not find " + component_id + " in " + topology.toString());
}
Object componentObject = Utils.getSetComponentObject(obj, loader);
Object rtn = null;
if (componentObject instanceof JavaObject) {
rtn = Thrift.instantiateJavaObject((JavaObject) componentObject);
} else if (componentObject instanceof ShellComponent) {
if (spouts.containsKey(component_id)) {
rtn = new ShellSpout((ShellComponent) componentObject);
} else {
rtn = new ShellBolt((ShellComponent) componentObject);
}
} else {
rtn = componentObject;
}
return rtn;
}
/**
* get current task's output <Stream_id, <componentId, MkGrouper>>
*
* @param topology_context
* @return
*/
public static Map<String, Map<String, MkGrouper>> outbound_components(TopologyContext topology_context, WorkerData workerData) {
Map<String, Map<String, MkGrouper>> rr = new HashMap<String, Map<String, MkGrouper>>();
// <Stream_id,<component,Grouping>>
Map<String, Map<String, Grouping>> output_groupings = topology_context.getThisTargets();
for (Entry<String, Map<String, Grouping>> entry : output_groupings.entrySet()) {
String stream_id = entry.getKey();
Map<String, Grouping> component_grouping = entry.getValue();
Fields out_fields = topology_context.getThisOutputFields(stream_id);
Map<String, MkGrouper> componentGrouper = new HashMap<String, MkGrouper>();
for (Entry<String, Grouping> cg : component_grouping.entrySet()) {
String component = cg.getKey();
Grouping tgrouping = cg.getValue();
List<Integer> outTasks = topology_context.getComponentTasks(component);
// ATTENTION: If topology set one component parallelism as 0
// so we don't need send tuple to it
if (outTasks.size() > 0) {
MkGrouper grouper = new MkGrouper(topology_context, out_fields, tgrouping, outTasks, stream_id, workerData);
componentGrouper.put(component, grouper);
}
LOG.info("outbound_components, outTasks=" + outTasks + " for task-" + topology_context.getThisTaskId());
}
if (componentGrouper.size() > 0) {
rr.put(stream_id, componentGrouper);
}
}
return rr;
}
/**
* get the component's configuration
*
* @param topology_context
* @param task_id
*/
public static Map getComponentMap(DefaultTopologyAssignContext context, Integer task) {
String componentName = context.getTaskToComponent().get(task);
ComponentCommon componentCommon = ThriftTopologyUtils.getComponentCommon(context.getSysTopology(), componentName);
Map componentMap = (Map) JStormUtils.from_json(componentCommon.get_json_conf());
if (componentMap == null) {
componentMap = Maps.newHashMap();
}
return componentMap;
}
/**
* get all bolts' inputs and spouts' outputs <Bolt_name, <Input_name>> <Spout_name, <Output_name>>
*
* @param topology_context
* @return all bolts' inputs and spouts' outputs
*/
public static Map<String, Set<String>> buildSpoutOutoputAndBoltInputMap(DefaultTopologyAssignContext context) {
Set<String> bolts = context.getRawTopology().get_bolts().keySet();
Set<String> spouts = context.getRawTopology().get_spouts().keySet();
Map<String, Set<String>> relationship = new HashMap<String, Set<String>>();
for (Entry<String, Bolt> entry : context.getRawTopology().get_bolts().entrySet()) {
Map<GlobalStreamId, Grouping> inputs = entry.getValue().get_common().get_inputs();
Set<String> input = new HashSet<String>();
relationship.put(entry.getKey(), input);
for (Entry<GlobalStreamId, Grouping> inEntry : inputs.entrySet()) {
String component = inEntry.getKey().get_componentId();
input.add(component);
if (!bolts.contains(component)) {
// spout
Set<String> spoutOutput = relationship.get(component);
if (spoutOutput == null) {
spoutOutput = new HashSet<String>();
relationship.put(component, spoutOutput);
}
spoutOutput.add(entry.getKey());
}
}
}
for (String spout : spouts) {
if (relationship.get(spout) == null)
relationship.put(spout, new HashSet<String>());
}
for (String bolt : bolts) {
if (relationship.get(bolt) == null)
relationship.put(bolt, new HashSet<String>());
}
return relationship;
}
public static Map<Integer, String> getTaskToComponent(Map<Integer, TaskInfo> taskInfoMap) {
Map<Integer, String> ret = new TreeMap<Integer, String>();
for (Entry<Integer, TaskInfo> entry : taskInfoMap.entrySet()) {
ret.put(entry.getKey(), entry.getValue().getComponentId());
}
return ret;
}
public static Map<Integer, String> getTaskToType(Map<Integer, TaskInfo> taskInfoMap) {
Map<Integer, String> ret = new TreeMap<Integer, String>();
for (Entry<Integer, TaskInfo> entry : taskInfoMap.entrySet()) {
ret.put(entry.getKey(), entry.getValue().getComponentType());
}
return ret;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static Integer mkTaskMaker(Map<Object, Object> stormConf, Map<String, ?> cidSpec, Map<Integer, TaskInfo> rtn, Integer cnt) {
if (cidSpec == null) {
LOG.warn("Component map is empty");
return cnt;
}
Set<?> entrySet = cidSpec.entrySet();
for (Iterator<?> it = entrySet.iterator(); it.hasNext();) {
Entry entry = (Entry) it.next();
Object obj = entry.getValue();
ComponentCommon common = null;
String componentType = "bolt";
if (obj instanceof Bolt) {
common = ((Bolt) obj).get_common();
componentType = "bolt";
} else if (obj instanceof SpoutSpec) {
common = ((SpoutSpec) obj).get_common();
componentType = "spout";
} else if (obj instanceof StateSpoutSpec) {
common = ((StateSpoutSpec) obj).get_common();
componentType = "spout";
}
if (common == null) {
throw new RuntimeException("No ComponentCommon of " + entry.getKey());
}
int declared = Thrift.parallelismHint(common);
Integer parallelism = declared;
// Map tmp = (Map) Utils_clj.from_json(common.get_json_conf());
Map newStormConf = new HashMap(stormConf);
// newStormConf.putAll(tmp);
Integer maxParallelism = JStormUtils.parseInt(newStormConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
if (maxParallelism != null) {
parallelism = Math.min(maxParallelism, declared);
}
for (int i = 0; i < parallelism; i++) {
cnt++;
TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), componentType);
rtn.put(cnt, taskInfo);
}
}
return cnt;
}
public static Map<Integer, TaskInfo> mkTaskInfo(Map<Object, Object> stormConf, StormTopology sysTopology, String topologyid) {
// use TreeMap to make task as sequence
Map<Integer, TaskInfo> rtn = new TreeMap<Integer, TaskInfo>();
Integer count = 0;
count = mkTaskMaker(stormConf, sysTopology.get_bolts(), rtn, count);
count = mkTaskMaker(stormConf, sysTopology.get_spouts(), rtn, count);
count = mkTaskMaker(stormConf, sysTopology.get_state_spouts(), rtn, count);
return rtn;
}
public static boolean isSystemComponent(String componentId) {
if (componentId.equals(Acker.ACKER_COMPONENT_ID) || componentId.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
return true;
}
return false;
}
}