blob: acba4aefbd86f98a5982375cc04b34ea87980a06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ComponentObject;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.JavaObject;
import org.apache.storm.generated.JavaObjectArg;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StateSpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StormTopology._Fields;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.shade.org.json.simple.JSONValue;
import org.apache.storm.task.IBolt;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Thrift {
private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
private static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null;
private static StormTopology._Fields[] SPOUT_FIELDS =
{ StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
static {
Set<_Fields> keys = StormTopology.metaDataMap.keySet();
keys.toArray(STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()]);
}
public static StormTopology._Fields[] getTopologyFields() {
return STORM_TOPOLOGY_FIELDS;
}
public static StormTopology._Fields[] getSpoutFields() {
return SPOUT_FIELDS;
}
public static StreamInfo directOutputFields(List<String> fields) {
return new StreamInfo(fields, true);
}
public static StreamInfo outputFields(List<String> fields) {
return new StreamInfo(fields, false);
}
public static Grouping prepareShuffleGrouping() {
return Grouping.shuffle(new NullStruct());
}
public static Grouping prepareLocalOrShuffleGrouping() {
return Grouping.local_or_shuffle(new NullStruct());
}
public static Grouping prepareFieldsGrouping(List<String> fields) {
return Grouping.fields(fields);
}
public static Grouping prepareGlobalGrouping() {
return prepareFieldsGrouping(new ArrayList<String>());
}
public static Grouping prepareDirectGrouping() {
return Grouping.direct(new NullStruct());
}
public static Grouping prepareAllGrouping() {
return Grouping.all(new NullStruct());
}
public static Grouping prepareNoneGrouping() {
return Grouping.none(new NullStruct());
}
public static Grouping prepareCustomStreamGrouping(Object obj) {
return Grouping.custom_serialized(Utils.javaSerialize(obj));
}
public static Grouping prepareCustomJavaObjectGrouping(JavaObject obj) {
return Grouping.custom_object(obj);
}
public static Object instantiateJavaObject(JavaObject obj) {
List<JavaObjectArg> args = obj.get_args_list();
Class[] paraTypes = new Class[args.size()];
Object[] paraValues = new Object[args.size()];
for (int i = 0; i < args.size(); i++) {
JavaObjectArg arg = args.get(i);
paraValues[i] = arg.getFieldValue();
if (arg.getSetField().equals(JavaObjectArg._Fields.INT_ARG)) {
paraTypes[i] = Integer.class;
} else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) {
paraTypes[i] = Long.class;
} else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) {
paraTypes[i] = String.class;
} else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) {
paraTypes[i] = Boolean.class;
} else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) {
paraTypes[i] = ByteBuffer.class;
} else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) {
paraTypes[i] = Double.class;
} else {
paraTypes[i] = Object.class;
}
}
try {
Class clazz = Class.forName(obj.get_full_class_name());
Constructor cons = clazz.getConstructor(paraTypes);
return cons.newInstance(paraValues);
} catch (Exception e) {
LOG.error("java object instantiation failed", e);
}
return null;
}
public static Grouping._Fields groupingType(Grouping grouping) {
return grouping.getSetField();
}
public static List<String> fieldGrouping(Grouping grouping) {
if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping");
}
return grouping.get_fields();
}
public static boolean isGlobalGrouping(Grouping grouping) {
if (Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
return fieldGrouping(grouping).isEmpty();
}
return false;
}
public static int getParallelismHint(ComponentCommon componentCommon) {
if (!componentCommon.is_set_parallelism_hint()) {
return 1;
} else {
return componentCommon.get_parallelism_hint();
}
}
public static ComponentObject serializeComponentObject(Object obj) {
return ComponentObject.serialized_java(Utils.javaSerialize(obj));
}
public static Object deserializeComponentObject(ComponentObject obj) {
if (obj.getSetField() != ComponentObject._Fields.SERIALIZED_JAVA) {
throw new RuntimeException("Cannot deserialize non-java-serialized object");
}
return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
}
public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String,
StreamInfo> outputs, Integer parallelismHint) {
return prepareComponentCommon(inputs, outputs, parallelismHint, null);
}
public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String, StreamInfo> outputs,
Integer parallelismHint, Map<String, Object> conf) {
Map<GlobalStreamId, Grouping> mappedInputs = new HashMap<>();
Map<String, StreamInfo> mappedOutputs = new HashMap<>();
if (inputs != null && !inputs.isEmpty()) {
mappedInputs.putAll(inputs);
}
if (outputs != null && !outputs.isEmpty()) {
mappedOutputs.putAll(outputs);
}
ComponentCommon component = new ComponentCommon(mappedInputs, mappedOutputs);
if (parallelismHint != null) {
component.set_parallelism_hint(parallelismHint);
}
if (conf != null) {
component.set_json_conf(JSONValue.toJSONString(conf));
}
return component;
}
public static SpoutSpec prepareSerializedSpoutDetails(IRichSpout spout, Map<String, StreamInfo> outputs) {
return new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)),
prepareComponentCommon(new HashMap<>(), outputs, null, null));
}
public static Bolt prepareSerializedBoltDetails(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, Map<String, StreamInfo> outputs,
Integer parallelismHint, Map<String, Object> conf) {
ComponentCommon common = prepareComponentCommon(inputs, outputs, parallelismHint, conf);
return new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common);
}
public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt) {
return prepareBoltDetails(inputs, bolt, null, null);
}
public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
Integer parallelismHint) {
return prepareBoltDetails(inputs, bolt, parallelismHint, null);
}
public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
Integer parallelismHint, Map<String, Object> conf) {
BoltDetails details = new BoltDetails(bolt, conf, parallelismHint, inputs);
return details;
}
public static SpoutDetails prepareSpoutDetails(IRichSpout spout) {
return prepareSpoutDetails(spout, null, null);
}
public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint) {
return prepareSpoutDetails(spout, parallelismHint, null);
}
public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint, Map<String, Object> conf) {
SpoutDetails details = new SpoutDetails(spout, parallelismHint, conf);
return details;
}
private static void addInputs(BoltDeclarer declarer, Map<GlobalStreamId, Grouping> inputs) {
for (Entry<GlobalStreamId, Grouping> entry : inputs.entrySet()) {
declarer.grouping(entry.getKey(), entry.getValue());
}
}
public static StormTopology buildTopology(HashMap<String, SpoutDetails> spoutMap,
HashMap<String, BoltDetails> boltMap, HashMap<String, StateSpoutSpec> stateMap) {
return buildTopology(spoutMap, boltMap);
}
public static StormTopology buildTopology(Map<String, SpoutDetails> spoutMap, Map<String, BoltDetails> boltMap) {
TopologyBuilder builder = new TopologyBuilder();
for (Entry<String, SpoutDetails> entry : spoutMap.entrySet()) {
String spoutId = entry.getKey();
SpoutDetails spec = entry.getValue();
SpoutDeclarer spoutDeclarer = builder.setSpout(spoutId, spec.getSpout(), spec.getParallelism());
spoutDeclarer.addConfigurations(spec.getConf());
}
for (Entry<String, BoltDetails> entry : boltMap.entrySet()) {
String spoutId = entry.getKey();
BoltDetails spec = entry.getValue();
BoltDeclarer boltDeclarer = null;
if (spec.bolt instanceof IRichBolt) {
boltDeclarer = builder.setBolt(spoutId, (IRichBolt) spec.getBolt(), spec.getParallelism());
} else {
boltDeclarer = builder.setBolt(spoutId, (IBasicBolt) spec.getBolt(), spec.getParallelism());
}
boltDeclarer.addConfigurations(spec.getConf());
addInputs(boltDeclarer, spec.getInputs());
}
return builder.createTopology();
}
public static class SpoutDetails {
private IRichSpout spout;
private Integer parallelism;
private Map<String, Object> conf;
public SpoutDetails(IRichSpout spout, Integer parallelism, Map<String, Object> conf) {
this.spout = spout;
this.parallelism = parallelism;
this.conf = conf;
}
public IRichSpout getSpout() {
return spout;
}
public Integer getParallelism() {
return parallelism;
}
public Map<String, Object> getConf() {
return conf;
}
}
public static class BoltDetails {
private Object bolt;
private Map<String, Object> conf;
private Integer parallelism;
private Map<GlobalStreamId, Grouping> inputs;
public BoltDetails(Object bolt, Map<String, Object> conf, Integer parallelism,
Map<GlobalStreamId, Grouping> inputs) {
this.bolt = bolt;
this.conf = conf;
this.parallelism = parallelism;
this.inputs = inputs;
}
public Object getBolt() {
return bolt;
}
public Map<String, Object> getConf() {
return conf;
}
public Map<GlobalStreamId, Grouping> getInputs() {
return inputs;
}
public Integer getParallelism() {
return parallelism;
}
}
}