Joshfischer/eco config (#2658)
* initial addition of directory structure
* clean up
* adding base config for build file \
fixing linting issues
* saving progress. Still need to fix missing class definition on ParseException
* fixing shaded dependency issue
* starting experimentation on how to approach loading config from yaml file
* experimenting with snake yaml to extract topology defintion
* adding start of topology and component definitions
* refactor
* successfully populating ecoTopologyDefinition from yaml
* clean up
* clean up
* fixed versioning error
* saving progress, still working through building topology from yaml
* saving progress
* cleaning up styles
* checking default config values
* adding test for customized config map
* transitioning to support heron api
* removing last of streamlet api from eco
* making a pivot to match storm flux behavior better
* saving before chaos
* saving progress, running into issue with verifying the toplogy
* adding some logging for debugging
* printing topology definition. Adding tests
* saving with failing test, still working through investigation
* back to working
* saving progress
* finishing eco parser test
* handling exception
* package change
* package change
* adding missed id
* MVP of ECO at this commit
* fixing styles, pulling topology name from topology definition file
* fixing spelling
* clean up
* clean up
* refatoring eco builder package
* start to removing static calls
* removing static calls
* still working through linting errors
* fixing lint errors
* fixing setter
* adding test
* start to verfiying bean references
* verifying components
* fixing tests
* worked passed constructor argument issue. Now need to deal with backtypes from Storm
* saving before restructure
* building kafka topology
* fixing imports for test
* fixing folder structure
* refactor for testability
* removing more statics
* cleaning up eco builders
* confirmation config builder maps as expected
* finishing configBuilder test
* cleaning up BoltBuilder
* verfiying component builder
* replacing java logger
* adding tests
* adding verification of building implementations of IRichBolts
* clean up
* adding mocks for testing
* fixing classNotFound errors
* coverage on all of stream bulder
* need to refactor ObjectBuilder
* moving static call
* adding verifcation to test
* clean up
* clean up
* verifying path of building objects with args
* covering other path
* starting on builder utility
* verifying className
* adding test
* adding test
* clean up
* adding test
* clean up
* clean up
* fixing checkstyle errors
* fixing linting issues
* fixing linting issues
* fixing linting issues
* fixing linting issues
* linting issues
* fixing checkstyle
* fixing checkstyle
* saving progress
* adding simple wordcount eco example
* adding examples
* checkstyles
diff --git a/examples/src/java/BUILD b/examples/src/java/BUILD
index 3994603..4a4772f 100644
--- a/examples/src/java/BUILD
+++ b/examples/src/java/BUILD
@@ -37,3 +37,24 @@
outs = ["heron-streamlet-examples.jar"],
cmd = "cp $< $@",
)
+
+
+java_binary(
+ name='eco-examples-unshaded',
+ srcs = glob(["com/twitter/heron/examples/eco/**/*.java"]),
+ deps = [
+ "//heron/api/src/java:api-java-low-level",
+ "//heron/api/src/java:api-java",
+ "//heron/common/src/java:basics-java",
+ "//heron/eco/src/java:eco-core",
+ "//storm-compatibility/src/java:storm-compatibility-java",
+ ],
+ create_executable = 0,
+)
+
+genrule(
+ name = 'heron-eco-examples',
+ srcs = [":eco-examples-unshaded_deploy.jar"],
+ outs = ["heron-eco-examples.jar"],
+ cmd = "cp $< $@",
+)
\ No newline at end of file
diff --git a/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java b/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
new file mode 100644
index 0000000..3271178
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
@@ -0,0 +1,41 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.examples.eco;
+
+
+import java.util.logging.Logger;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Simple bolt that does nothing other than LOG.info() every tuple received.
+ *
+ */
+@SuppressWarnings("serial")
+public class LogInfoBolt extends BaseBasicBolt {
+ private static final Logger LOG = Logger.getLogger(LogInfoBolt.class.getName());
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ LOG.info("{ }" + tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java b/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
new file mode 100644
index 0000000..00b5f64
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
@@ -0,0 +1,61 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+import static com.twitter.heron.api.utils.Utils.tuple;
+@SuppressWarnings({"serial", "rawtypes"})
+public class TestNameCounter extends BaseBasicBolt {
+
+ private Map<String, Integer> counts;
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext) {
+ counts = new HashMap<>();
+ }
+
+
+ protected String getTupleValue(Tuple t, int idx) {
+ return (String) t.getValues().get(idx);
+ }
+
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ String word = getTupleValue(input, 0);
+ int count = 0;
+ if (counts.containsKey(word)) {
+ count = counts.get(word);
+ }
+ count++;
+ counts.put(word, count);
+ collector.emit(tuple(word, count));
+ }
+
+ public void cleanup() {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("name", "count"));
+ }
+
+}
diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java b/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
new file mode 100644
index 0000000..4e7e8a5
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
@@ -0,0 +1,81 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.storm.Config;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+@SuppressWarnings({"serial", "HiddenField"})
+public class TestNameSpout extends BaseRichSpout {
+ private boolean isdistributed;
+ private SpoutOutputCollector collector;
+
+ public TestNameSpout() {
+ this(true);
+ }
+
+ public TestNameSpout(boolean isDistributed) {
+ isdistributed = isDistributed;
+ }
+
+ public void open(Map<String, Object> conf, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public void close() {
+
+ }
+
+ public void nextTuple() {
+ Utils.sleep(100);
+ final String[] words = new String[] {"marge", "homer", "bart", "simpson", "lisa"};
+ final Random rand = new Random();
+ final String word = words[rand.nextInt(words.length)];
+ collector.emit(new Values(word));
+ }
+
+ public void ack(Object msgId) {
+
+ }
+
+ public void fail(Object msgId) {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("name"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ if (!isdistributed) {
+ Map<String, Object> ret = new HashMap<String, Object>();
+ ret.put(Config.TOPOLOGY_WORKERS, 1);
+ return ret;
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java b/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
new file mode 100644
index 0000000..f9cb1d6
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
@@ -0,0 +1,33 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+@SuppressWarnings("serial")
+public class TestPrintBolt extends BaseBasicBolt {
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ System.out.println(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer ofd) {
+ }
+
+}
diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java b/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
new file mode 100644
index 0000000..ef6046c
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
@@ -0,0 +1,46 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+
+@SuppressWarnings({"serial", "HiddenField"})
+public class TestWindowBolt extends BaseWindowedBolt {
+ private OutputCollector collector;
+
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context,
+ OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ collector.emit(new Values(inputWindow.get().size()));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("count"));
+ }
+}
diff --git a/examples/src/resources/simple_windowing.yaml b/examples/src/resources/simple_windowing.yaml
new file mode 100644
index 0000000..e860b64
--- /dev/null
+++ b/examples/src/resources/simple_windowing.yaml
@@ -0,0 +1,67 @@
+# Copyright 2017 Twitter. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+name: "sliding-window-topology"
+
+components:
+ - id: "windowLength"
+ className: "org.apache.storm.topology.base.BaseWindowedBolt$Count"
+ constructorArgs:
+ - 5
+ - id: "slidingInterval"
+ className: "org.apache.storm.topology.base.BaseWindowedBolt$Count"
+ constructorArgs:
+ - 3
+
+config:
+ topology.workers: 1
+
+# spout definitions
+spouts:
+ - id: "spout-1"
+ className: "com.twitter.heron.examples.eco.TestNameSpout"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+ - id: "bolt-1"
+ className: "com.twitter.heron.examples.eco.TestWindowBolt"
+ configMethods:
+ - name: "withWindow"
+ args: [ref: "windowLength", ref: "slidingInterval"]
+ parallelism: 1
+ - id: "bolt-2"
+ className: "com.twitter.heron.examples.eco.TestPrintBolt"
+ parallelism: 1
+
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+# id: "connection-1"
+ from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: FIELDS
+ args: ["word"]
+ - name: "bolt-1 --> bolt-2" # name isn't used (placeholder for logging, UI, etc.)
+# id: "connection-1"
+ from: "bolt-1"
+ to: "bolt-2"
+ grouping:
+ type: SHUFFLE
\ No newline at end of file
diff --git a/examples/src/resources/simple_wordcount.yaml b/examples/src/resources/simple_wordcount.yaml
new file mode 100644
index 0000000..d28eec2
--- /dev/null
+++ b/examples/src/resources/simple_wordcount.yaml
@@ -0,0 +1,60 @@
+# Copyright 2017 Twitter. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "simple-wordcount-topology"
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+ topology.workers: 1
+
+# spout definitions
+spouts:
+ - id: "spout-1"
+ className: "com.twitter.heron.examples.eco.TestNameSpout"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+ - id: "bolt-1"
+ className: "com.twitter.heron.examples.eco.TestNameCounter"
+ parallelism: 1
+
+ - id: "bolt-2"
+ className: "com.twitter.heron.examples.eco.LogInfoBolt"
+ parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+ id: "connection-1"
+ from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: FIELDS
+ args: ["word"]
+
+ - name: "bolt-1 --> bolt2"
+ id: "connection-2"
+ from: "bolt-1"
+ to: "bolt-2"
+ grouping:
+ type: SHUFFLE
\ No newline at end of file
diff --git a/heron/eco/src/java/BUILD b/heron/eco/src/java/BUILD
new file mode 100644
index 0000000..ec26955
--- /dev/null
+++ b/heron/eco/src/java/BUILD
@@ -0,0 +1,34 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+eco_deps = [
+"@commons_cli_commons_cli//jar",
+"@org_yaml_snakeyaml//jar",
+"//third_party/java:logging",
+"//storm-compatibility/src/java:storm-compatibility-java",
+"//heron/api/src/java:api-java-low-level",
+"//heron/common/src/java:basics-java",
+]
+
+
+java_binary(
+ name="eco-core-shaded",
+ srcs = glob(["com/twitter/heron/eco/**/*.java"]),
+ deps = eco_deps,
+ create_executable = 0,
+
+)
+
+java_library(
+ name = "eco-java",
+ srcs = glob(["com/twitter/heron/eco/**/*.java"]),
+ deps = eco_deps
+)
+
+genrule(
+ name = "eco-core",
+ srcs = [":eco-core-shaded_deploy.jar"],
+ outs = ["eco-core.jar"],
+ cmd = "cp $< $@",
+)
diff --git a/heron/eco/src/java/com/twitter/heron/eco/Eco.java b/heron/eco/src/java/com/twitter/heron/eco/Eco.java
new file mode 100644
index 0000000..cc30489
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/Eco.java
@@ -0,0 +1,158 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.logging.Logger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.builder.BoltBuilder;
+import com.twitter.heron.eco.builder.BuilderUtility;
+import com.twitter.heron.eco.builder.ComponentBuilder;
+import com.twitter.heron.eco.builder.ConfigBuilder;
+import com.twitter.heron.eco.builder.EcoBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+import com.twitter.heron.eco.builder.SpoutBuilder;
+import com.twitter.heron.eco.builder.StreamBuilder;
+import com.twitter.heron.eco.definition.BoltDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.SpoutDefinition;
+import com.twitter.heron.eco.definition.StreamDefinition;
+import com.twitter.heron.eco.parser.EcoParser;
+import com.twitter.heron.eco.submit.EcoSubmitter;
+
+
+public class Eco {
+
+ private static final Logger LOG = Logger.getLogger(Eco.class.getName());
+
+ private EcoBuilder ecoBuilder;
+ private EcoParser ecoParser;
+ private EcoSubmitter ecoSubmitter;
+
+ public Eco(EcoBuilder ecoBuilder, EcoParser ecoParser, EcoSubmitter ecoSubmitter) {
+ this.ecoBuilder = ecoBuilder;
+ this.ecoParser = ecoParser;
+ this.ecoSubmitter = ecoSubmitter;
+ }
+
+ public void submit(FileInputStream fileInputStream) throws Exception {
+ EcoTopologyDefinition topologyDefinition = ecoParser.parseFromInputStream(fileInputStream);
+
+ String topologyName = topologyDefinition.getName();
+
+ Config topologyConfig = ecoBuilder
+ .buildConfig(topologyDefinition);
+
+ EcoExecutionContext executionContext
+ = new EcoExecutionContext(topologyDefinition, topologyConfig);
+
+ printTopologyInfo(executionContext);
+
+ ObjectBuilder objectBuilder = new ObjectBuilder();
+ objectBuilder.setBuilderUtility(new BuilderUtility());
+ TopologyBuilder builder = ecoBuilder
+ .buildTopologyBuilder(executionContext, objectBuilder);
+
+ ecoSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = constructOptions();
+
+ CommandLineParser parser = new DefaultParser();
+
+ CommandLine cmd;
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e) {
+ throw new RuntimeException("Error parsing command line options: ", e);
+ }
+
+ FileInputStream fin = new FileInputStream(new File(cmd.getOptionValue("eco-config-file")));
+
+ Eco eco = new Eco(
+ new EcoBuilder(
+ new SpoutBuilder(),
+ new BoltBuilder(),
+ new StreamBuilder(),
+ new ComponentBuilder(),
+ new ConfigBuilder()),
+ new EcoParser(),
+ new EcoSubmitter());
+
+ eco.submit(fin);
+ }
+
+ private static Options constructOptions() {
+ Options options = new Options();
+ Option ecoConfig = Option.builder("eco")
+ .desc("Yaml config file for specifying topology definitions")
+ .longOpt("eco-config-file")
+ .hasArgs()
+ .argName("eco-config-file")
+ .required()
+ .build();
+ options.addOption(ecoConfig);
+ return options;
+ }
+
+ // construct command line help options
+ //TODO: (joshfischer) integrate with existing system somehow
+ private static Options constructHelpOptions() {
+ Options options = new Options();
+ Option help = Option.builder("h")
+ .desc("List all options and their description")
+ .longOpt("help")
+ .build();
+
+ options.addOption(help);
+ return options;
+ }
+
+ static void printTopologyInfo(EcoExecutionContext ctx) {
+ EcoTopologyDefinition t = ctx.getTopologyDefinition();
+
+ LOG.info("---------- TOPOLOGY DETAILS ----------");
+
+ LOG.info(String.format("Topology Name: %s", t.getName()));
+ LOG.info("--------------- SPOUTS ---------------");
+ for (SpoutDefinition s : t.getSpouts()) {
+ LOG.info(String.format("%s [%d] (%s)", s.getId(), s.getParallelism(), s.getClassName()));
+ }
+ LOG.info("---------------- BOLTS ---------------");
+ for (BoltDefinition b : t.getBolts()) {
+ LOG.info(String.format("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName()));
+ }
+
+ LOG.info("--------------- STREAMS ---------------");
+ for (StreamDefinition sd : t.getStreams()) {
+ LOG.info(String.format("%s --%s--> %s",
+ sd.getFrom(),
+ sd.getGrouping().getType(),
+ sd.getTo()));
+ }
+ LOG.info("--------------------------------------");
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java
new file mode 100644
index 0000000..6af6ac9
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java
@@ -0,0 +1,35 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+
+public class BoltBuilder {
+
+ protected void buildBolts(EcoExecutionContext executionContext,
+ ObjectBuilder objectBuilder)
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException,
+ NoSuchFieldException, InvocationTargetException {
+ EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition();
+
+ for (ObjectDefinition def: topologyDefinition.getBolts()) {
+ Object obj = objectBuilder.buildObject(def, executionContext);
+ executionContext.addBolt(def.getId(), obj);
+ }
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java b/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java
new file mode 100644
index 0000000..a0ed526
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java
@@ -0,0 +1,116 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.heron.eco.definition.BeanListReference;
+import com.twitter.heron.eco.definition.BeanReference;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+import com.twitter.heron.eco.definition.PropertyDefinition;
+
+public class BuilderUtility {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BuilderUtility.class);
+
+ @SuppressWarnings("rawtypes")
+ protected List<Object> resolveReferences(List<Object> args, EcoExecutionContext context) {
+ LOG.debug("Checking arguments for references.");
+ List<Object> cArgs = new ArrayList<>();
+
+ // resolve references
+ for (Object arg : args) {
+ if (arg instanceof BeanReference) {
+ LOG.debug("BeanReference: " + ((BeanReference) arg).getId());
+ cArgs.add(context.getComponent(((BeanReference) arg).getId()));
+ } else if (arg instanceof BeanListReference) {
+ List<Object> components = new ArrayList<>();
+ BeanListReference ref = (BeanListReference) arg;
+ for (String id : ref.getIds()) {
+ components.add(context.getComponent(id));
+ }
+
+ LOG.debug("BeanListReference resolved as {}" + components);
+ cArgs.add(components);
+ } else {
+ LOG.debug("Unknown:" + arg.toString());
+ cArgs.add(arg);
+ }
+ }
+ return cArgs;
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected void applyProperties(ObjectDefinition bean, Object instance,
+ EcoExecutionContext context) throws
+ IllegalAccessException, InvocationTargetException, NoSuchFieldException {
+ List<PropertyDefinition> props = bean.getProperties();
+ Class clazz = instance.getClass();
+ if (props != null) {
+ for (PropertyDefinition prop : props) {
+ Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
+ Method setter = findSetter(clazz, prop.getName());
+ if (setter != null) {
+ LOG.info("found setter, attempting with: " + instance.getClass() + " " + value);
+ // invoke setter
+ setter.invoke(instance, new Object[]{value});
+ } else {
+ // look for a public instance variable
+ LOG.debug("no setter found. Looking for a public instance variable...");
+ Field field = findPublicField(clazz, prop.getName());
+ if (field != null) {
+ field.set(instance, value);
+ }
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected Field findPublicField(Class clazz, String property)
+ throws NoSuchFieldException {
+ Field field = clazz.getField(property);
+ return field;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private Method findSetter(Class clazz, String property) {
+ String setterName = toSetterName(property);
+ Method retval = null;
+ Method[] methods = clazz.getMethods();
+ for (Method method : methods) {
+ if (setterName.equals(method.getName())) {
+ LOG.debug("Found setter method: " + method.getName());
+ retval = method;
+ }
+ }
+ return retval;
+ }
+
+ protected String toSetterName(String name) {
+ return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length());
+ }
+
+ protected Class<?> classForName(String className) throws ClassNotFoundException {
+ return Class.forName(className);
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java
new file mode 100644
index 0000000..9fe6023
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java
@@ -0,0 +1,36 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+
+import com.twitter.heron.eco.definition.BeanDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+
+public class ComponentBuilder {
+ protected void buildComponents(EcoExecutionContext context, ObjectBuilder objectBuilder)
+ throws ClassNotFoundException,
+ IllegalAccessException, InstantiationException,
+ NoSuchFieldException, InvocationTargetException {
+ List<BeanDefinition> componentDefinitions = context.getTopologyDefinition().getComponents();
+
+ if (componentDefinitions != null) {
+ for (BeanDefinition bean : componentDefinitions) {
+ Object obj = objectBuilder.buildObject(bean, context);
+ context.addComponent(bean.getId(), obj);
+ }
+ }
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java
new file mode 100644
index 0000000..a6b758b
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java
@@ -0,0 +1,32 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.util.Map;
+
+import org.apache.storm.Config;
+
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+public class ConfigBuilder {
+ protected Config buildConfig(EcoTopologyDefinition topologyDefinition) {
+
+ Map<String, Object> configMap = topologyDefinition.getConfig();
+ Config config = new Config();
+ for (Map.Entry<String, Object> entry: configMap.entrySet()) {
+ config.put(entry.getKey(), entry.getValue());
+ }
+ return config;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java
new file mode 100644
index 0000000..c448f72
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java
@@ -0,0 +1,66 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+
+public class EcoBuilder {
+
+ private SpoutBuilder spoutBuilder;
+
+ private BoltBuilder boltBuilder;
+
+ private StreamBuilder streamBuilder;
+
+ private ComponentBuilder componentBuilder;
+
+ private ConfigBuilder configBuilder;
+
+ public EcoBuilder(SpoutBuilder spoutBuilder, BoltBuilder boltBuilder,
+ StreamBuilder streamBuilder, ComponentBuilder componentBuilder,
+ ConfigBuilder configBuilder) {
+ this.spoutBuilder = spoutBuilder;
+ this.boltBuilder = boltBuilder;
+ this.streamBuilder = streamBuilder;
+ this.componentBuilder = componentBuilder;
+ this.configBuilder = configBuilder;
+ }
+
+ public TopologyBuilder buildTopologyBuilder(EcoExecutionContext executionContext,
+ ObjectBuilder objectBuilder)
+ throws InstantiationException, IllegalAccessException,
+ ClassNotFoundException,
+ NoSuchFieldException, InvocationTargetException {
+
+ TopologyBuilder builder = new TopologyBuilder();
+ componentBuilder.buildComponents(executionContext, objectBuilder);
+ spoutBuilder.buildSpouts(executionContext, builder, objectBuilder);
+ boltBuilder.buildBolts(executionContext, objectBuilder);
+ streamBuilder.buildStreams(executionContext, builder, objectBuilder);
+
+ return builder;
+ }
+
+ public Config buildConfig(EcoTopologyDefinition topologyDefinition) {
+ return this.configBuilder.buildConfig(topologyDefinition);
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java
new file mode 100644
index 0000000..5f2f441
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java
@@ -0,0 +1,311 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.heron.eco.definition.ConfigurationMethodDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+
+public class ObjectBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(ObjectBuilder.class);
+
+ private BuilderUtility builderUtility;
+
+ public void setBuilderUtility(BuilderUtility builderUtility) {
+ this.builderUtility = builderUtility;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Object buildObject(ObjectDefinition def, EcoExecutionContext context)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+ InvocationTargetException, NoSuchFieldException {
+ Class clazz = builderUtility.classForName(def.getClassName());
+
+ Object obj;
+ if (def.hasConstructorArgs()) {
+ LOG.debug("Found constructor arguments in definition ");
+ List<Object> cArgs = def.getConstructorArgs();
+
+ if (def.hasReferences()) {
+ LOG.debug("The definition has references");
+ cArgs = builderUtility.resolveReferences(cArgs, context);
+ } else {
+ LOG.debug("The definition does not have references");
+ }
+ LOG.debug("finding compatible constructor for : " + clazz.getName());
+ Constructor con = findCompatibleConstructor(cArgs, clazz);
+ if (con != null) {
+ LOG.debug("Found something seemingly compatible, attempting invocation...");
+ obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes()));
+ } else {
+ String msg = String
+ .format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
+ clazz.getName(),
+ cArgs);
+ throw new IllegalArgumentException(msg);
+ }
+ } else {
+ obj = clazz.newInstance();
+ }
+ builderUtility.applyProperties(def, obj, context);
+ invokeConfigMethods(def, obj, context);
+ return obj;
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected Constructor findCompatibleConstructor(List<Object> args, Class target) {
+ Constructor retval = null;
+ int eligibleCount = 0;
+
+ LOG.debug("Target class: " + target.getName() + ", constructor args: " + args);
+ Constructor[] cons = target.getDeclaredConstructors();
+
+ for (Constructor con : cons) {
+ Class[] paramClasses = con.getParameterTypes();
+
+ if (paramClasses.length == args.size()) {
+ LOG.debug("found constructor with same number of args..");
+ boolean invokable = canInvokeWithArgs(args, con.getParameterTypes());
+ if (invokable) {
+ retval = con;
+ eligibleCount++;
+ }
+ LOG.debug("** invokable --> {}" + invokable);
+ } else {
+ LOG.debug("Skipping constructor with wrong number of arguments.");
+ }
+ }
+ if (eligibleCount > 1) {
+ LOG.error("Found multiple invokable constructors for class: "
+ + target + ", given arguments " + args + ". Using the last one found.");
+ }
+ return retval;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) {
+ if (parameterTypes.length != args.size()) {
+ LOG.warn("parameter types were the wrong size");
+ return false;
+ }
+
+ for (int i = 0; i < args.size(); i++) {
+ Object obj = args.get(i);
+ if (obj == null) {
+ throw new IllegalArgumentException("argument shouldn't be null - index: " + i);
+ }
+ Class paramType = parameterTypes[i];
+ Class objectType = obj.getClass();
+ LOG.debug("Comparing parameter class " + paramType + " to object class "
+ + objectType + "to see if assignment is possible.");
+ if (paramType.equals(objectType)) {
+ LOG.debug("Yes, they are the same class.");
+ } else if (paramType.isAssignableFrom(objectType)) {
+ LOG.debug("Yes, assignment is possible.");
+ } else if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)) {
+ LOG.debug("Yes, assignment is possible.");
+ } else if (isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)) {
+ LOG.debug("Yes, assignment is possible.");
+ } else if (paramType.isEnum() && objectType.equals(String.class)) {
+ LOG.debug("Yes, will convert a String to enum");
+ } else if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
+ LOG.debug("Assignment is possible if we convert a List to an array.");
+ LOG.debug("Array Type: " + paramType.getComponentType() + ", List type: "
+ + ((List) obj).get(0).getClass());
+ } else {
+ LOG.debug("returning false");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected boolean isPrimitiveNumber(Class clazz) {
+ return clazz.isPrimitive() && !clazz.equals(boolean.class);
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected boolean isPrimitiveBoolean(Class clazz) {
+ return clazz.isPrimitive() && clazz.equals(boolean.class);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void invokeConfigMethods(ObjectDefinition bean,
+ Object instance, EcoExecutionContext context)
+ throws InvocationTargetException, IllegalAccessException {
+
+ List<ConfigurationMethodDefinition> methodDefs = bean.getConfigMethods();
+ if (methodDefs == null || methodDefs.size() == 0) {
+ return;
+ }
+ Class clazz = instance.getClass();
+ for (ConfigurationMethodDefinition methodDef : methodDefs) {
+ List<Object> args = methodDef.getArgs();
+ if (args == null) {
+ args = new ArrayList<Object>();
+ }
+ if (methodDef.hasReferences()) {
+ args = builderUtility.resolveReferences(args, context);
+ }
+ String methodName = methodDef.getName();
+ LOG.debug("method name: " + methodName);
+ Method method = findCompatibleMethod(args, clazz, methodName);
+ if (method != null) {
+ Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
+ method.invoke(instance, methodArgs);
+ } else {
+ String msg = String
+ .format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
+ new Object[]{methodName, clazz.getName(), args});
+ throw new IllegalArgumentException(msg);
+ }
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private Method findCompatibleMethod(List<Object> args, Class target, String methodName) {
+ Method retval = null;
+ int eligibleCount = 0;
+ LOG.debug("Target class: " + target.getName() + ", methodName: "
+ + methodName + ", args: " + args);
+ Method[] methods = target.getMethods();
+ LOG.debug("methods count: " + methods.length);
+ for (Method method : methods) {
+ Class[] paramClasses = method.getParameterTypes();
+ if (paramClasses.length == args.size() && method.getName().equals(methodName)) {
+ LOG.debug("found constructor with same number of args..");
+ boolean invokable = false;
+ if (args.size() == 0) {
+ // it's a method with zero args
+ invokable = true;
+ } else {
+ invokable = canInvokeWithArgs(args, method.getParameterTypes());
+ }
+ if (invokable) {
+ retval = method;
+ eligibleCount++;
+ }
+ LOG.debug("** invokable --> " + invokable);
+ } else {
+ LOG.debug("Skipping method with wrong number of arguments.");
+ }
+ }
+ if (eligibleCount > 1) {
+ LOG.warn("Found multiple invokable methods for class, method, given arguments {} "
+ + new Object[]{target, methodName, args});
+ }
+ return retval;
+ }
+
+
+
+ /**
+ * Given a java.util.List of contructor/method arguments, and a list of parameter types,
+ * attempt to convert the
+ * list to an java.lang.Object array that can be used to invoke the constructor.
+ * If an argument needs
+ * to be coerced from a List to an Array, do so.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
+// Class[] parameterTypes = constructor.getParameterTypes();
+ if (parameterTypes.length != args.size()) {
+ throw new IllegalArgumentException("Contructor parameter count does not "
+ + "egual argument size.");
+ }
+ Object[] constructorParams = new Object[args.size()];
+
+ // loop through the arguments, if we hit a list that has to be convered to an array,
+ // perform the conversion
+ for (int i = 0; i < args.size(); i++) {
+ Object obj = args.get(i);
+ Class paramType = parameterTypes[i];
+ Class objectType = obj.getClass();
+ LOG.debug("Comparing parameter class " + paramType.getName() + " to object class "
+ + objectType.getName() + " to see if assignment is possible.");
+ if (paramType.equals(objectType)) {
+ LOG.debug("They are the same class.");
+ constructorParams[i] = args.get(i);
+ continue;
+ }
+ if (paramType.isAssignableFrom(objectType)) {
+ LOG.debug("Assignment is possible.");
+ constructorParams[i] = args.get(i);
+ continue;
+ }
+ if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)) {
+ LOG.debug("Its a primitive boolean.");
+ Boolean bool = (Boolean) args.get(i);
+ constructorParams[i] = bool.booleanValue();
+ continue;
+ }
+ if (isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)) {
+ LOG.debug("Its a primitive number.");
+ Number num = (Number) args.get(i);
+ if (paramType == Float.TYPE) {
+ constructorParams[i] = num.floatValue();
+ } else if (paramType == Double.TYPE) {
+ constructorParams[i] = num.doubleValue();
+ } else if (paramType == Long.TYPE) {
+ constructorParams[i] = num.longValue();
+ } else if (paramType == Integer.TYPE) {
+ constructorParams[i] = num.intValue();
+ } else if (paramType == Short.TYPE) {
+ constructorParams[i] = num.shortValue();
+ } else if (paramType == Byte.TYPE) {
+ constructorParams[i] = num.byteValue();
+ } else {
+ constructorParams[i] = args.get(i);
+ }
+ continue;
+ }
+
+ // enum conversion
+ if (paramType.isEnum() && objectType.equals(String.class)) {
+ LOG.debug("Yes, will convert a String to enum");
+ constructorParams[i] = Enum.valueOf(paramType, (String) args.get(i));
+ continue;
+ }
+
+ // List to array conversion
+ if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
+ LOG.debug("Conversion appears possible...");
+ List list = (List) obj;
+ LOG.debug("Array Type: {}, List type: {}" + paramType.getComponentType()
+ + list.get(0).getClass());
+
+ // create an array of the right type
+ Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size());
+ for (int j = 0; j < list.size(); j++) {
+ Array.set(newArrayObj, j, list.get(j));
+
+ }
+ constructorParams[i] = newArrayObj;
+ LOG.debug("After conversion: {}" + constructorParams[i]);
+ }
+ }
+ return constructorParams;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java
new file mode 100644
index 0000000..19dbfc9
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java
@@ -0,0 +1,41 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+
+
+public class SpoutBuilder {
+
+ protected void buildSpouts(EcoExecutionContext executionContext,
+ TopologyBuilder builder,
+ ObjectBuilder objectBuilder)
+ throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+ NoSuchFieldException, InvocationTargetException {
+ EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition();
+
+ for (ObjectDefinition def: topologyDefinition.getSpouts()) {
+ Object obj = objectBuilder.buildObject(def, executionContext);
+ builder.setSpout(def.getId(), (IRichSpout) obj, def.getParallelism());
+ executionContext.addSpout(def.getId(), obj);
+ }
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java
new file mode 100644
index 0000000..e31e3f0
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java
@@ -0,0 +1,129 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IWindowedBolt;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import com.twitter.heron.eco.definition.ComponentStream;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.GroupingDefinition;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+import com.twitter.heron.eco.definition.StreamDefinition;
+
+public class StreamBuilder {
+
+ protected void buildStreams(EcoExecutionContext executionContext, TopologyBuilder builder,
+ ObjectBuilder objectBuilder)
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException,
+ NoSuchFieldException, InvocationTargetException {
+ EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition();
+ Map<String, ComponentStream> componentStreams = new HashMap<>();
+
+ HashMap<String, BoltDeclarer> declarers = new HashMap<>();
+ for (StreamDefinition stream : topologyDefinition.getStreams()) {
+ Object boltObj = executionContext.getBolt(stream.getTo());
+ BoltDeclarer declarer = declarers.get(stream.getTo());
+ if (boltObj instanceof IRichBolt) {
+ if (declarer == null) {
+ declarer = builder.setBolt(stream.getTo(),
+ (IRichBolt) boltObj,
+ topologyDefinition.parallelismForBolt(stream.getTo()));
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else if (boltObj instanceof IBasicBolt) {
+ if (declarer == null) {
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IBasicBolt) boltObj,
+ topologyDefinition.parallelismForBolt(stream.getTo()));
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else if (boltObj instanceof IWindowedBolt) {
+ if (declarer == null) {
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IWindowedBolt) boltObj,
+ topologyDefinition.parallelismForBolt(stream.getTo()));
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else {
+ throw new IllegalArgumentException("Class does not appear to be a bolt: "
+ + boltObj.getClass().getName());
+ }
+
+ GroupingDefinition grouping = stream.getGrouping();
+ // if the streamId is defined, use it for the grouping,
+ // otherwise assume default stream
+ // Todo(joshfischer) Not sure if "default" is still valid
+ String streamId = grouping.getStreamId() == null
+ ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId();
+
+
+ switch (grouping.getType()) {
+ case SHUFFLE:
+ declarer.shuffleGrouping(stream.getFrom(), streamId);
+ break;
+ case FIELDS:
+ //TODO check for null grouping args
+ List<String> groupingArgs = grouping.getArgs();
+ if (groupingArgs == null) {
+ throw new IllegalArgumentException("You must supply arguments for Fields grouping");
+ }
+ declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(groupingArgs));
+ break;
+ case ALL:
+ declarer.allGrouping(stream.getFrom(), streamId);
+ break;
+ case GLOBAL:
+ declarer.globalGrouping(stream.getFrom(), streamId);
+ break;
+ case NONE:
+ declarer.noneGrouping(stream.getFrom(), streamId);
+ break;
+ case CUSTOM:
+ declarer.customGrouping(stream.getFrom(), streamId,
+ buildCustomStreamGrouping(stream.getGrouping().getCustomClass(),
+ executionContext,
+ objectBuilder));
+ break;
+ default:
+ throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+ }
+ }
+ executionContext.setStreams(componentStreams);
+ }
+
+ private CustomStreamGrouping buildCustomStreamGrouping(ObjectDefinition objectDefinition,
+ EcoExecutionContext executionContext,
+ ObjectBuilder objectBuilder)
+ throws ClassNotFoundException,
+ IllegalAccessException, InstantiationException, NoSuchFieldException,
+ InvocationTargetException {
+ Object grouping = objectBuilder.buildObject(objectDefinition, executionContext);
+ return (CustomStreamGrouping) grouping;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java
new file mode 100644
index 0000000..806ae14
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java
@@ -0,0 +1,27 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class BeanDefinition extends ObjectDefinition {
+
+ private String id;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java
new file mode 100644
index 0000000..ff70fde
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java
@@ -0,0 +1,30 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.List;
+
+public class BeanListReference {
+ public List<String> ids;
+
+ public BeanListReference() { }
+
+ public BeanListReference(List<String> ids) {
+ this.ids = ids;
+ }
+
+ public List<String> getIds() {
+ return ids;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java
new file mode 100644
index 0000000..2314c82
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java
@@ -0,0 +1,30 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class BeanReference {
+ private String id;
+
+ public BeanReference(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java
new file mode 100644
index 0000000..203623d
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java
@@ -0,0 +1,17 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class BoltDefinition extends ObjectDefinition {
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java
new file mode 100644
index 0000000..5b747fd
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java
@@ -0,0 +1,67 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class ComponentStream {
+
+ private String id;
+
+ private String toComponent;
+
+ private String fromComponent;
+
+ private String streamName;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Object getToComponent() {
+ return toComponent;
+ }
+
+ public void setToComponent(String toComponent) {
+ this.toComponent = toComponent;
+ }
+
+ public String getFromComponent() {
+ return fromComponent;
+ }
+
+ public void setFromComponent(String fromComponent) {
+ this.fromComponent = fromComponent;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
+ @Override
+ public String toString() {
+ return "ComponentStream{"
+ + "toComponent='" + toComponent + '\''
+ + ", fromComponent='" + fromComponent
+ + '\''
+ + ", streamName='" + streamName + '\''
+ + '}';
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java
new file mode 100644
index 0000000..d63918b
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java
@@ -0,0 +1,64 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigurationMethodDefinition {
+ private String name;
+ private List<Object> args;
+ private boolean hasReferences = false;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List<Object> getArgs() {
+ return args;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void setArgs(List<Object> args) {
+
+ List<Object> newVal = new ArrayList<Object>();
+ for (Object obj : args) {
+ if (obj instanceof LinkedHashMap) {
+ Map map = (Map) obj;
+ if (map.containsKey("ref") && map.size() == 1) {
+ newVal.add(new BeanReference((String) map.get("ref")));
+ this.hasReferences = true;
+ } else if (map.containsKey("reflist") && map.size() == 1) {
+ newVal.add(new BeanListReference((List<String>) map.get("reflist")));
+ this.hasReferences = true;
+ } else {
+ newVal.add(obj);
+ }
+ } else {
+ newVal.add(obj);
+ }
+ }
+ this.args = newVal;
+ }
+
+ public boolean hasReferences() {
+ return this.hasReferences;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java
new file mode 100644
index 0000000..6a1c8da
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java
@@ -0,0 +1,112 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+
+
+public class EcoExecutionContext {
+
+ private EcoTopologyDefinition topologyDefinition;
+
+ private Config config;
+
+ private Map<String, Object> spouts = new HashMap<>();
+
+ private Map<String, Object> bolts = new HashMap<>();
+
+ private Map<String, ComponentStream> streams = new HashMap<>();
+
+ private Map<String, Object> components = new HashMap<>();
+
+ public EcoExecutionContext(EcoTopologyDefinition topologyDefinition, Config config) {
+ this.topologyDefinition = topologyDefinition;
+ this.config = config;
+ }
+
+ public EcoTopologyDefinition getTopologyDefinition() {
+ return topologyDefinition;
+ }
+
+ public void setTopologyDefinition(EcoTopologyDefinition topologyDefinition) {
+ this.topologyDefinition = topologyDefinition;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ public Map<String, Object> getSpouts() {
+ return spouts;
+ }
+
+ public void setSpouts(Map<String, Object> spouts) {
+ this.spouts = spouts;
+ }
+
+ public Map<String, Object> getBolts() {
+ return bolts;
+ }
+
+ public Object getBolt(String id) {
+ return this.bolts.get(id);
+ }
+
+ public void setBolts(Map<String, Object> bolts) {
+ this.bolts = bolts;
+ }
+
+ public void addBolt(String key, Object value) {
+ this.bolts.put(key, value);
+ }
+
+ public Object getChild(String id) {
+ return this.bolts.get(id);
+ }
+
+ public Map<String, ComponentStream> getStreams() {
+ return streams;
+ }
+
+ public void setStreams(Map<String, ComponentStream> streams) {
+ this.streams = streams;
+ }
+
+ public Map<String, Object> getComponents() {
+ return components;
+ }
+
+ public void addComponent(String key, Object value) {
+ this.components.put(key, value);
+ }
+
+ public Object getComponent(String id) {
+ return this.components.get(id);
+ }
+
+ public void setComponents(Map<String, Object> components) {
+ this.components = components;
+ }
+
+ public void addSpout(String key, Object value) {
+ this.spouts.put(key, value);
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java
new file mode 100644
index 0000000..b5b8ad9
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java
@@ -0,0 +1,108 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EcoTopologyDefinition {
+
+ private String name;
+ private Map<String, Object> config = new HashMap<>();
+ private Map<String, SpoutDefinition> spouts = new LinkedHashMap<>();
+ private Map<String, BoltDefinition> bolts = new LinkedHashMap<>();
+ private List<StreamDefinition> streams = new ArrayList<>();
+ private Map<String, BeanDefinition> components = new LinkedHashMap<>();
+
+ public List<SpoutDefinition> getSpouts() {
+ return new ArrayList<>(this.spouts.values());
+ }
+
+ public SpoutDefinition getSpout(String id) {
+ return this.spouts.get(id);
+ }
+
+ public void setSpouts(List<SpoutDefinition> sources) {
+ this.spouts = new LinkedHashMap<>();
+ for (SpoutDefinition source: sources) {
+ this.spouts.put(source.getId(), source);
+ }
+ }
+
+ public List<BoltDefinition> getBolts() {
+ return new ArrayList<>(this.bolts.values());
+ }
+
+ public BoltDefinition getBolt(String id) {
+ return this.bolts.get(id);
+ }
+
+ public void setBolts(List<BoltDefinition> children) {
+ this.bolts = new LinkedHashMap<>();
+ for (BoltDefinition child: children) {
+ this.bolts.put(child.getId(), child);
+ }
+ }
+
+ public List<BeanDefinition> getComponents() {
+ return new ArrayList<>(this.components.values());
+ }
+
+ public Object getComponent(String id) {
+ return this.components.get(id);
+ }
+
+ public void setComponents(List<BeanDefinition> components) {
+ for (BeanDefinition bean: components) {
+ this.components.put(bean.getId(), bean);
+ }
+ }
+
+ public void addComponent(String key, BeanDefinition value) {
+ this.components.put(key, value);
+ }
+
+ public List<StreamDefinition> getStreams() {
+ return streams;
+ }
+
+ public void setStreams(List<StreamDefinition> streams) {
+ this.streams = streams;
+ }
+
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ public String getName() {
+
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Number parallelismForBolt(String to) {
+ return this.bolts.get(to).getParallelism();
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java
new file mode 100644
index 0000000..1eeb817
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java
@@ -0,0 +1,65 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.List;
+
+public class GroupingDefinition {
+
+ public enum Type {
+ ALL,
+ CUSTOM,
+ SHUFFLE,
+ FIELDS,
+ GLOBAL,
+ NONE
+ }
+
+ private Type type;
+ private String streamId;
+ private List<String> args;
+ private ObjectDefinition customClass;
+
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ public List<String> getArgs() {
+ return args;
+ }
+
+ public void setArgs(List<String> args) {
+ this.args = args;
+ }
+
+ public ObjectDefinition getCustomClass() {
+ return customClass;
+ }
+
+ public void setCustomClass(ObjectDefinition customClass) {
+ this.customClass = customClass;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java
new file mode 100644
index 0000000..3a675b7
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java
@@ -0,0 +1,106 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class ObjectDefinition {
+
+ private String id;
+ private String className;
+ private int parallelism = 1;
+ private List<Object> constructorArgs;
+ private List<PropertyDefinition> properties;
+ private List<ConfigurationMethodDefinition> configMethods;
+ private boolean hasReferences;
+
+ public List<PropertyDefinition> getProperties() {
+ return properties;
+ }
+
+ public boolean hasConstructorArgs() {
+ return this.constructorArgs != null && this.constructorArgs.size() > 0;
+ }
+
+ public void setProperties(List<PropertyDefinition> properties) {
+ this.properties = properties;
+ }
+
+ public boolean hasReferences() {
+ return this.hasReferences;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public List<Object> getConstructorArgs() {
+ return constructorArgs;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void setConstructorArgs(List<Object> constructorArgs) {
+
+ List<Object> newVal = new ArrayList<Object>();
+ for (Object obj : constructorArgs) {
+ if (obj instanceof LinkedHashMap) {
+ Map map = (Map) obj;
+ if (map.containsKey("ref") && map.size() == 1) {
+ newVal.add(new BeanReference((String) map.get("ref")));
+ this.hasReferences = true;
+ } else if (map.containsKey("reflist") && map.size() == 1) {
+ newVal.add(new BeanListReference((List<String>) map.get("reflist")));
+ this.hasReferences = true;
+ } else {
+ newVal.add(obj);
+ }
+ } else {
+ newVal.add(obj);
+ }
+ }
+ this.constructorArgs = newVal;
+ }
+
+ public List<ConfigurationMethodDefinition> getConfigMethods() {
+ return configMethods;
+ }
+
+ public void setConfigMethods(List<ConfigurationMethodDefinition> configMethods) {
+ this.configMethods = configMethods;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java
new file mode 100644
index 0000000..e1cc902
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java
@@ -0,0 +1,55 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class PropertyDefinition {
+
+ private String name;
+ private Object value;
+ private String ref;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public void setValue(Object value) {
+ if (this.ref != null) {
+ throw new IllegalStateException("A property can only have a value OR a reference, not both.");
+ }
+ this.value = value;
+ }
+
+ public String getRef() {
+ return ref;
+ }
+
+ public void setRef(String ref) {
+ if (this.value != null) {
+ throw new IllegalStateException("A property can only have a value OR a reference, not both.");
+ }
+ this.ref = ref;
+ }
+
+ public boolean isReference() {
+ return this.ref != null;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java
new file mode 100644
index 0000000..cb4f0a1
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java
@@ -0,0 +1,17 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class SpoutDefinition extends ObjectDefinition {
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java
new file mode 100644
index 0000000..03081c3
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java
@@ -0,0 +1,63 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class StreamDefinition {
+
+ private String id;
+ private String name;
+ private String to;
+ private String from;
+ private GroupingDefinition grouping;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getTo() {
+ return to;
+ }
+
+ public void setTo(String to) {
+ this.to = to;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public GroupingDefinition getGrouping() {
+ return grouping;
+ }
+
+ public void setGrouping(GroupingDefinition groupingDefinition) {
+ this.grouping = groupingDefinition;
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java b/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java
new file mode 100644
index 0000000..5aacfb3
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java
@@ -0,0 +1,53 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.parser;
+
+import java.io.InputStream;
+
+import org.yaml.snakeyaml.TypeDescription;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+import com.twitter.heron.eco.definition.BoltDefinition;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.SpoutDefinition;
+
+public class EcoParser {
+
+ public EcoTopologyDefinition parseFromInputStream(InputStream inputStream)
+ throws Exception {
+
+ Yaml yaml = topologyYaml();
+
+ if (inputStream == null) {
+ throw new Exception("Unable to load eco input stream");
+ }
+ return loadTopologyFromYaml(yaml, inputStream);
+ }
+
+ private EcoTopologyDefinition loadTopologyFromYaml(Yaml yaml, InputStream inputStream) {
+ return (EcoTopologyDefinition) yaml.load(inputStream);
+ }
+ private static Yaml topologyYaml() {
+ Constructor topologyConstructor = new Constructor(EcoTopologyDefinition.class);
+
+ TypeDescription topologyDescription = new TypeDescription(EcoTopologyDefinition.class);
+
+ topologyDescription.putListPropertyType("spouts", SpoutDefinition.class);
+ topologyDescription.putListPropertyType("bolts", BoltDefinition.class);
+ topologyConstructor.addTypeDescription(topologyDescription);
+
+ return new Yaml(topologyConstructor);
+ }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java b/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java
new file mode 100644
index 0000000..032a73e
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java
@@ -0,0 +1,28 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.submit;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+
+public class EcoSubmitter {
+
+ public void submitTopology(String topologyName, Config topologyConfig, StormTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ StormSubmitter.submitTopology(topologyName, topologyConfig, topology);
+ }
+}
diff --git a/heron/eco/tests/java/BUILD b/heron/eco/tests/java/BUILD
new file mode 100644
index 0000000..ef66b76
--- /dev/null
+++ b/heron/eco/tests/java/BUILD
@@ -0,0 +1,90 @@
+test_deps_files = [
+ "//third_party/java:powermock",
+ "//third_party/java:mockito",
+ "//third_party/java:junit4",
+]
+
+heron_local_deps = [
+ "//heron/eco/src/java:eco-java",
+ "//heron/api/src/java:api-java-low-level",
+ "//storm-compatibility/src/java:storm-compatibility-java",
+]
+
+eco_test_deps = heron_local_deps + test_deps_files
+
+java_test(
+ name = "EcoBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/EcoBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small",
+)
+
+java_test(
+ name = "EcoParserTest",
+ srcs = glob(["com/twitter/heron/eco/parser/EcoParserTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "ConfigBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/ConfigBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "BoltBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/BoltBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "ComponentBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/ComponentBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "SpoutBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/SpoutBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "StreamBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/StreamBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "EcoTest",
+ srcs = glob(["com/twitter/heron/eco/EcoTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "ObjectBuilderTest",
+ srcs = glob(["com/twitter/heron/eco/builder/ObjectBuilderTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "BuilderUtilityTest",
+ srcs = glob(["com/twitter/heron/eco/builder/BuilderUtilityTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
+
+java_test(
+ name = "EcoSubmitterTest",
+ srcs = glob(["com/twitter/heron/eco/submit/EcoSubmitterTest.java"]),
+ deps = eco_test_deps,
+ size = "small"
+)
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java b/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java
new file mode 100644
index 0000000..ccf97f4
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java
@@ -0,0 +1,91 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco;
+
+import java.io.FileInputStream;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+
+import com.twitter.heron.eco.builder.EcoBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.parser.EcoParser;
+import com.twitter.heron.eco.submit.EcoSubmitter;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EcoTest {
+
+ @Mock
+ private EcoBuilder mockEcoBuilder;
+ @Mock
+ private EcoParser mockEcoParser;
+ @Mock
+ private TopologyBuilder mockTopologyBuilder;
+ @Mock
+ private EcoSubmitter mockEcoSubmitter;
+ @InjectMocks
+ private Eco subject;
+
+ @After
+ public void ensureNoUnexpectedMockInteractions() {
+ Mockito.verifyNoMoreInteractions(mockEcoBuilder,
+ mockEcoParser,
+ mockTopologyBuilder,
+ mockEcoSubmitter);
+ }
+
+ @Test
+ public void testSubmit_AllGood_BehavesAsExpected() throws Exception {
+ FileInputStream mockStream = PowerMockito.mock(FileInputStream.class);
+
+ final String topologyName = "the name";
+ EcoTopologyDefinition topologyDefinition = new EcoTopologyDefinition();
+ topologyDefinition.setName(topologyName);
+ Config config = new Config();
+
+ when(mockEcoParser.parseFromInputStream(eq(mockStream))).thenReturn(topologyDefinition);
+ when(mockEcoBuilder.buildConfig(eq(topologyDefinition))).thenReturn(config);
+ when(mockEcoBuilder.buildTopologyBuilder(any(EcoExecutionContext.class),
+ any(ObjectBuilder.class))).thenReturn(mockTopologyBuilder);
+
+ subject.submit(mockStream);
+
+ verify(mockEcoParser).parseFromInputStream(same(mockStream));
+ verify(mockEcoBuilder).buildConfig(same(topologyDefinition));
+ verify(mockEcoBuilder).buildTopologyBuilder(any(EcoExecutionContext.class),
+ any(ObjectBuilder.class));
+ verify(mockTopologyBuilder).createTopology();
+ verify(mockEcoSubmitter).submitTopology(any(String.class), any(Config.class),
+ any(StormTopology.class));
+ }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java
new file mode 100644
index 0000000..5ae0c9d
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java
@@ -0,0 +1,89 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.BoltDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BoltBuilderTest {
+
+ @Mock
+ private EcoExecutionContext mockContext;
+ @Mock
+ private ObjectBuilder mockObjectBuilder;
+
+ private BoltBuilder subject;
+
+ @Before
+ public void setUpForEachTestCase() {
+ subject = new BoltBuilder();
+ }
+
+ @After
+ public void ensureNoUnexpectedMockInteractions() {
+ Mockito.verifyNoMoreInteractions(mockContext,
+ mockObjectBuilder);
+ }
+
+ @Test
+ public void testBuildBolts_AllGood_BehavesAsExpected() throws ClassNotFoundException,
+ InvocationTargetException, NoSuchFieldException, InstantiationException,
+ IllegalAccessException {
+ EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition();
+ BoltDefinition boltDefinition = new BoltDefinition();
+ final String id = "id";
+ boltDefinition.setId(id);
+ BoltDefinition boltDefinition1 = new BoltDefinition();
+ final String id1 = "id1";
+ boltDefinition1.setId(id1);
+ List<BoltDefinition> boltDefinitions = new ArrayList<>();
+ boltDefinitions.add(boltDefinition);
+ boltDefinitions.add(boltDefinition1);
+ ecoTopologyDefinition.setBolts(boltDefinitions);
+ Object object = new Object();
+ Object object1 = new Object();
+
+ when(mockContext.getTopologyDefinition()).thenReturn(ecoTopologyDefinition);
+ when(mockObjectBuilder.buildObject(eq(boltDefinition), eq(mockContext))).thenReturn(object);
+ when(mockObjectBuilder.buildObject(eq(boltDefinition1), eq(mockContext))).thenReturn(object1);
+
+ subject.buildBolts(mockContext, mockObjectBuilder);
+
+ verify(mockContext).getTopologyDefinition();
+ verify(mockObjectBuilder).buildObject(same(boltDefinition), same(mockContext));
+ verify(mockObjectBuilder).buildObject(same(boltDefinition1), same(mockContext));
+ verify(mockContext).addBolt(eq(id), anyObject());
+ verify(mockContext).addBolt(eq(id1), anyObject());
+ }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java
new file mode 100644
index 0000000..a92bc63
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java
@@ -0,0 +1,139 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.testing.TestWordSpout;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.BeanReference;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+import com.twitter.heron.eco.definition.PropertyDefinition;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+import static org.junit.Assert.assertThat;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BuilderUtilityTest {
+
+
+ @Mock
+ private ObjectDefinition mockObjectDefinition;
+ @Mock
+ private Object mockObject;
+ @Mock
+ private EcoExecutionContext mockContext;
+
+ private BuilderUtility subject;
+
+ @Before
+ public void setUpForEachTestCase() {
+ subject = new BuilderUtility();
+ }
+
+ @After
+ public void ensureNoUnexpectedMockInteractions() {
+ Mockito.verifyNoMoreInteractions(mockObject,
+ mockObjectDefinition,
+ mockContext);
+ }
+
+ @Test
+ public void toSetterName_ReturnsCorrectName() {
+ final String name = "name";
+ final String expectedName = "setName";
+ String setterName = subject.toSetterName(name);
+
+ assertThat(setterName, is(equalTo(expectedName)));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void classForName_ReturnsCorrectClass() throws ClassNotFoundException {
+ final String className = TestWordSpout.class.getName();
+
+ Class clazz = subject.classForName(className);
+
+ assertThat(clazz, notNullValue());
+ assertThat(className, is(equalTo(clazz.getName())));
+ }
+
+ @Test
+ public void applyProperties_SetterFound_BehavesAsExpected()
+ throws IllegalAccessException, NoSuchFieldException,
+ InvocationTargetException {
+ final String id = "id";
+ final String ref = "ref";
+ final String fakeComponent = "component";
+ BeanReference beanReference = new BeanReference(id);
+ List<PropertyDefinition> propertyDefinitions = new ArrayList<>();
+ PropertyDefinition propertyDefinition = new PropertyDefinition();
+ propertyDefinition.setRef(ref);
+ propertyDefinition.setName(id);
+ propertyDefinitions.add(propertyDefinition);
+
+ when(mockObjectDefinition.getProperties()).thenReturn(propertyDefinitions);
+ when(mockContext.getComponent(eq(ref))).thenReturn(fakeComponent);
+
+ subject.applyProperties(mockObjectDefinition, beanReference, mockContext);
+
+ verify(mockContext).getComponent(same(ref));
+ verify(mockObjectDefinition).getProperties();
+ }
+
+ @Test
+ public void applyProperties_NoSetterFound_BehavesAsExpected()
+ throws IllegalAccessException, NoSuchFieldException,
+ InvocationTargetException {
+ final String ref = "ref";
+ final String fakeComponent = "component";
+ MockComponent mockComponent = new MockComponent();
+ List<PropertyDefinition> propertyDefinitions = new ArrayList<>();
+ PropertyDefinition propertyDefinition = new PropertyDefinition();
+ propertyDefinition.setRef(ref);
+ propertyDefinition.setName("publicStr");
+ propertyDefinitions.add(propertyDefinition);
+
+ when(mockObjectDefinition.getProperties()).thenReturn(propertyDefinitions);
+ when(mockContext.getComponent(eq(ref))).thenReturn(fakeComponent);
+
+ subject.applyProperties(mockObjectDefinition, mockComponent, mockContext);
+
+ verify(mockContext).getComponent(same(ref));
+ verify(mockObjectDefinition).getProperties();
+ }
+
+ public class MockComponent {
+ public String publicStr;
+ }
+
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java
new file mode 100644
index 0000000..d3d5102
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java
@@ -0,0 +1,89 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.BeanDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ComponentBuilderTest {
+
+ @Mock
+ private EcoExecutionContext mockContext;
+ @Mock
+ private ObjectBuilder mockObjectBuilder;
+
+ private ComponentBuilder subject;
+
+ @Before
+ public void setUpForEachTestCase() {
+ subject = new ComponentBuilder();
+ }
+
+ @After
+ public void ensureNoUnexpectedMockInteractions() {
+ Mockito.verifyNoMoreInteractions(mockContext,
+ mockObjectBuilder);
+ }
+
+ @Test
+ public void testBuildComponents_AllGood_BehavesAsExpected() throws ClassNotFoundException,
+ InvocationTargetException, NoSuchFieldException,
+ InstantiationException, IllegalAccessException {
+ BeanDefinition beanDefinition = new BeanDefinition();
+ final String id = "bean";
+ beanDefinition.setId(id);
+ BeanDefinition beanDefinition1 = new BeanDefinition();
+ final String id1 = "bean1";
+ beanDefinition1.setId(id1);
+ List<BeanDefinition> componentDefinitions = new ArrayList<>();
+ componentDefinitions.add(beanDefinition);
+ componentDefinitions.add(beanDefinition1);
+ EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition();
+ ecoTopologyDefinition.setComponents(componentDefinitions);
+ Object object = new Object();
+ Object object1 = new Object();
+
+ when(mockContext.getTopologyDefinition()).thenReturn(ecoTopologyDefinition);
+ when(mockObjectBuilder.buildObject(eq(beanDefinition), eq(mockContext))).thenReturn(object);
+ when(mockObjectBuilder.buildObject(eq(beanDefinition1), eq(mockContext))).thenReturn(object1);
+
+ subject.buildComponents(mockContext, mockObjectBuilder);
+
+ verify(mockContext).getTopologyDefinition();
+ verify(mockObjectBuilder).buildObject(same(beanDefinition), same(mockContext));
+ verify(mockObjectBuilder).buildObject(same(beanDefinition1), same(mockContext));
+ verify(mockContext).addComponent(eq(id), anyObject());
+ verify(mockContext).addComponent(eq(id1), anyObject());
+ }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java
new file mode 100644
index 0000000..e65845c
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java
@@ -0,0 +1,63 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link ConfigBuilder}
+ */
+public class ConfigBuilderTest {
+
+ private ConfigBuilder subject;
+
+ @Before
+ public void setUpForEachTestCase() {
+ subject = new ConfigBuilder();
+ }
+
+ @Test
+ public void testBuildConfig_ConfigIsNotDefined_ReturnsEmptyConfig() {
+ EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition();
+
+ Config config = subject.buildConfig(ecoTopologyDefinition);
+
+ assertThat(0, is(equalTo(config.size())));
+ }
+
+ @Test
+ public void testBuildConfig_ConfigIsDefined_ReturnsCorrectValues() {
+ EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition();
+ Map<String, Object> topologyDefinitionConfig = new HashMap<>();
+ topologyDefinitionConfig.put(Config.STORM_ZOOKEEPER_SERVERS, 2);
+ topologyDefinitionConfig.put(Config.TOPOLOGY_WORKERS, 4);
+ ecoTopologyDefinition.setConfig(topologyDefinitionConfig);
+
+ Config config = subject.buildConfig(ecoTopologyDefinition);
+
+ assertThat(config.get(Config.STORM_ZOOKEEPER_SERVERS), is(equalTo(2)));
+ assertThat(config.get(Config.TOPOLOGY_WORKERS), is(equalTo(4)));
+ }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java
new file mode 100644
index 0000000..1bf87b2
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java
@@ -0,0 +1,127 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EcoBuilderTest {
+
+ @Mock
+ private SpoutBuilder mockSpoutBuilder;
+ @Mock
+ private BoltBuilder mockBoltBuilder;
+ @Mock
+ private StreamBuilder mockStreamBuilder;
+ @Mock
+ private ComponentBuilder mockComponentBuilder;
+ @Mock
+ private ConfigBuilder mockConfigBuilder;
+ @InjectMocks
+ private EcoBuilder subject;
+
+ private Map<String, Object> configMap;
+
+ private EcoTopologyDefinition ecoTopologyDefinition;
+
+ @Before
+ public void setUpForEachTestCase() {
+ configMap = new HashMap<>();
+ ecoTopologyDefinition = new EcoTopologyDefinition();
+ ecoTopologyDefinition.setConfig(configMap);
+ }
+
+ @After
+ public void ensureNoUnexpectedMockInteractions() {
+ verifyNoMoreInteractions(mockSpoutBuilder,
+ mockBoltBuilder,
+ mockStreamBuilder,
+ mockComponentBuilder,
+ mockConfigBuilder);
+ }
+
+ @Test
+ public void testBuild_EmptyConfigMap_ReturnsDefaultConfigs() {
+
+ Config config = new Config();
+ when(mockConfigBuilder.buildConfig(eq(ecoTopologyDefinition))).thenReturn(config);
+
+ Config returnedConfig = subject.buildConfig(ecoTopologyDefinition);
+
+ verify(mockConfigBuilder).buildConfig(same(ecoTopologyDefinition));
+
+ assertThat(returnedConfig.get(Config.TOPOLOGY_DEBUG), is(nullValue()));
+ assertThat(config, sameInstance(returnedConfig));
+ }
+
+ @Test
+ public void testBuild_CustomConfigMap_ReturnsCorrectConfigs() {
+ configMap.put(Config.TOPOLOGY_DEBUG, false);
+ final String environment = "dev";
+ final int spouts = 3;
+ configMap.put(Config.TOPOLOGY_ENVIRONMENT, environment);
+ configMap.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, spouts);
+
+ Config config = new Config();
+
+ when(mockConfigBuilder.buildConfig(eq(ecoTopologyDefinition))).thenReturn(config);
+
+ assertThat(subject.buildConfig(ecoTopologyDefinition), sameInstance(config));
+
+ verify(mockConfigBuilder).buildConfig(same(ecoTopologyDefinition));
+ }
+
+ @Test
+ public void testBuildTopologyBuilder_BuildsAsExpected()
+ throws IllegalAccessException, ClassNotFoundException,
+ InstantiationException, NoSuchFieldException, InvocationTargetException {
+ Config config = new Config();
+ EcoExecutionContext context = new EcoExecutionContext(ecoTopologyDefinition, config);
+ ObjectBuilder objectBuilder = new ObjectBuilder();
+ subject.buildTopologyBuilder(context, objectBuilder);
+
+ verify(mockSpoutBuilder).buildSpouts(same(context),
+ any(TopologyBuilder.class), same(objectBuilder));
+ verify(mockBoltBuilder).buildBolts(same(context), same(objectBuilder));
+ verify(mockStreamBuilder).buildStreams(same(context), any(TopologyBuilder.class),
+ same(objectBuilder));
+ verify(mockComponentBuilder).buildComponents(same(context), same(objectBuilder));
+ }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java
new file mode 100644
index 0000000..d1ee878
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java
@@ -0,0 +1,159 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.TestWordSpout;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.ConfigurationMethodDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+@RunWith(MockitoJUnitRunner.class)
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ObjectBuilderTest {
+
+ @Mock
+ private ObjectDefinition mockObjectDefinition;
+ @Mock
+ private EcoExecutionContext mockContext;
+ @Mock
+ private BuilderUtility mockBuilderUtility;
+ @Mock
+ private ConfigurationMethodDefinition mockMethodDefinition;
+ @InjectMocks
+ private ObjectBuilder subject;
+
+ @After
+ public void ensureNoUnexpectedMockInteractions() {
+ Mockito.verifyNoMoreInteractions(mockContext,
+ mockObjectDefinition,
+ mockBuilderUtility,
+ mockMethodDefinition);
+ }
+
+ @Test
+ public void buildObject_WithArgsBeanReferenceAndOther_BehavesAsExpected()
+ throws ClassNotFoundException,
+ InvocationTargetException, NoSuchFieldException,
+ InstantiationException, IllegalAccessException {
+ final String beanReference1 = "bean1";
+ List<Object> constructorArgs = new ArrayList<>();
+ List<Object> objects = new ArrayList<>();
+ List<Object> firstObject = new ArrayList<>();
+ objects.add(firstObject);
+ constructorArgs.add(objects);
+ Object someComponent = new Object();
+ final String className = FixedTuple.class.getName();
+ final Class testClass = FixedTuple.class;
+ final String methodName = "toString";
+ List<ConfigurationMethodDefinition> methodDefinitions = new ArrayList<>();
+
+ methodDefinitions.add(mockMethodDefinition);
+
+ when(mockObjectDefinition.getClassName()).thenReturn(className);
+ when(mockObjectDefinition.hasConstructorArgs()).thenReturn(true);
+ when(mockObjectDefinition.getConstructorArgs()).thenReturn(constructorArgs);
+ when(mockObjectDefinition.hasReferences()).thenReturn(true);
+ when(mockContext.getComponent(eq(beanReference1))).thenReturn(someComponent);
+ when(mockBuilderUtility.resolveReferences(eq(constructorArgs), eq(mockContext)))
+ .thenCallRealMethod();
+ when(mockBuilderUtility.classForName(eq(className))).thenReturn(testClass);
+ when(mockObjectDefinition.getConfigMethods()).thenReturn(methodDefinitions);
+ when(mockMethodDefinition.hasReferences()).thenReturn(true);
+ when(mockMethodDefinition.getArgs()).thenReturn(null);
+ when(mockMethodDefinition.getName()).thenReturn(methodName);
+
+ Object object = subject.buildObject(mockObjectDefinition, mockContext);
+
+ verify(mockObjectDefinition).getClassName();
+ verify(mockObjectDefinition).hasConstructorArgs();
+ verify(mockObjectDefinition).getConstructorArgs();
+ verify(mockObjectDefinition).hasReferences();
+ verify(mockBuilderUtility).classForName(same(className));
+ verify(mockBuilderUtility).resolveReferences(same(constructorArgs), same(mockContext));
+ verify(mockBuilderUtility).applyProperties(eq(mockObjectDefinition), any(Object.class),
+ same(mockContext));
+ verify(mockObjectDefinition).getConfigMethods();
+ verify(mockMethodDefinition).hasReferences();
+ verify(mockMethodDefinition).getArgs();
+ verify(mockBuilderUtility, times(2)).resolveReferences(anyListOf(Object.class),
+ same(mockContext));
+ verify(mockMethodDefinition).getName();
+
+ assertThat(object, is(instanceOf(FixedTuple.class)));
+ FixedTuple fixedTuple = (FixedTuple) object;
+ assertThat(fixedTuple.values, is(equalTo(objects)));
+ assertThat(fixedTuple.values.get(0), is(equalTo(firstObject)));
+ }
+
+ @Test
+ public void buildObject_NoArgs_BehavesAsExpected()
+ throws ClassNotFoundException, InvocationTargetException,
+ NoSuchFieldException, InstantiationException, IllegalAccessException {
+
+ final Class fixedTupleClass = TestWordSpout.class;
+ final String className = TestWordSpout.class.getName();
+ List<ConfigurationMethodDefinition> methodDefinitions = new ArrayList<>();
+ final String methodName = "close";
+
+ methodDefinitions.add(mockMethodDefinition);
+
+ when(mockObjectDefinition.getClassName()).thenReturn(className);
+ when(mockObjectDefinition.hasConstructorArgs()).thenReturn(false);
+ when(mockBuilderUtility.classForName(eq(className))).thenReturn(fixedTupleClass);
+ when(mockObjectDefinition.getConfigMethods()).thenReturn(methodDefinitions);
+ when(mockMethodDefinition.hasReferences()).thenReturn(false);
+ when(mockMethodDefinition.getName()).thenReturn(methodName);
+
+ subject.buildObject(mockObjectDefinition, mockContext);
+
+ verify(mockObjectDefinition).getClassName();
+ verify(mockObjectDefinition).hasConstructorArgs();
+ verify(mockBuilderUtility).classForName(same(className));
+ verify(mockBuilderUtility).applyProperties(same(mockObjectDefinition),
+ anyObject(), same(mockContext));
+ verify(mockObjectDefinition).getConfigMethods();
+ verify(mockMethodDefinition).hasReferences();
+ verify(mockMethodDefinition).getName();
+ verify(mockMethodDefinition).getArgs();
+ }
+}
+
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java
new file mode 100644
index 0000000..12abe93
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java
@@ -0,0 +1,159 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.SpoutDefinition;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SpoutBuilderTest {
+
+ @Mock
+ private EcoExecutionContext mockContext;
+ @Mock
+ private TopologyBuilder mockTopologyBuilder;
+ @Mock
+ private ObjectBuilder mockObjectBuilder;
+
+ private SpoutBuilder subject;
+
+ @Before
+ public void setUpForEachTestCase() {
+ subject = new SpoutBuilder();
+ }
+
+ @After
+ public void ensureNoUnexpectedMockInteractions() {
+ Mockito.verifyNoMoreInteractions(mockContext,
+ mockTopologyBuilder,
+ mockObjectBuilder);
+ }
+
+ @Test
+ public void testBuildSpouts_AllGood_BehavesAsExpected() throws ClassNotFoundException,
+ InvocationTargetException, NoSuchFieldException,
+ InstantiationException, IllegalAccessException {
+ EcoTopologyDefinition topologyDefinition = new EcoTopologyDefinition();
+
+ SpoutDefinition spoutDefinition = new SpoutDefinition();
+ final String id = "id";
+ final int parallelism = 2;
+ spoutDefinition.setId(id);
+ spoutDefinition.setParallelism(parallelism);
+ SpoutDefinition spoutDefinition1 = new SpoutDefinition();
+ final String id1 = "id1";
+ final int parallelism1 = 3;
+ spoutDefinition1.setId(id1);
+ spoutDefinition1.setParallelism(parallelism1);
+ List<SpoutDefinition> spoutDefinitions = new ArrayList<>();
+ spoutDefinitions.add(spoutDefinition);
+ spoutDefinitions.add(spoutDefinition1);
+ topologyDefinition.setSpouts(spoutDefinitions);
+ MockSpout mockSpout = new MockSpout();
+ MockSpout mockSpout1 = new MockSpout();
+
+ when(mockObjectBuilder.buildObject(eq(spoutDefinition),
+ eq(mockContext))).thenReturn(mockSpout);
+ when(mockObjectBuilder.buildObject(eq(spoutDefinition1),
+ eq(mockContext))).thenReturn(mockSpout1);
+ when(mockContext.getTopologyDefinition()).thenReturn(topologyDefinition);
+
+ subject.buildSpouts(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+ verify(mockContext).getTopologyDefinition();
+ verify(mockObjectBuilder).buildObject(same(spoutDefinition), same(mockContext));
+ verify(mockObjectBuilder).buildObject(same(spoutDefinition1), same(mockContext));
+ verify(mockTopologyBuilder).setSpout(eq(id), eq(mockSpout), eq(parallelism));
+ verify(mockTopologyBuilder).setSpout(eq(id1), eq(mockSpout1), eq(parallelism1));
+ verify(mockContext).addSpout(eq(id), anyObject());
+ verify(mockContext).addSpout(eq(id1), anyObject());
+ }
+
+ @SuppressWarnings("serial")
+ private class MockSpout implements IRichSpout {
+
+ @Override
+ public void open(Map<String, Object> conf,
+ TopologyContext context, SpoutOutputCollector collector) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void activate() {
+
+ }
+
+ @Override
+ public void deactivate() {
+
+ }
+
+ @Override
+ public void nextTuple() {
+
+ }
+
+ @Override
+ public void ack(Object msgId) {
+
+ }
+
+ @Override
+ public void fail(Object msgId) {
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java
new file mode 100644
index 0000000..9c9b868
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java
@@ -0,0 +1,370 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IWindowedBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TimestampExtractor;
+import org.apache.storm.windowing.TupleWindow;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.GroupingDefinition;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+import com.twitter.heron.eco.definition.StreamDefinition;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StreamBuilderTest {
+
+ @Mock
+ private EcoTopologyDefinition mockDefinition;
+ @Mock
+ private EcoExecutionContext mockContext;
+ @Mock
+ private TopologyBuilder mockTopologyBuilder;
+ @Mock
+ private ObjectBuilder mockObjectBuilder;
+ @Mock
+ private BoltDeclarer mockBoltDeclarer;
+
+ private StreamBuilder subject;
+
+ @Before
+ public void setUpForEachTestCase() {
+ subject = new StreamBuilder();
+ }
+
+ @After
+ public void ensureNoUnexpectedMockInteractions() {
+ Mockito.verifyNoMoreInteractions(mockDefinition,
+ mockContext,
+ mockTopologyBuilder,
+ mockObjectBuilder,
+ mockBoltDeclarer);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void buildStreams_SpoutToIRichBolt_ShuffleGrouping() throws ClassNotFoundException,
+ InvocationTargetException, NoSuchFieldException,
+ InstantiationException, IllegalAccessException {
+ final int iRichBoltParallelism = 1;
+ final String to = "to";
+ final String from = "from";
+ final String streamId = "id";
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setFrom(from);
+ streamDefinition.setTo(to);
+ streamDefinition.setId(streamId);
+ List<StreamDefinition> streams = new ArrayList<>();
+ streams.add(streamDefinition);
+ GroupingDefinition groupingDefinition = new GroupingDefinition();
+ groupingDefinition.setType(GroupingDefinition.Type.SHUFFLE);
+ groupingDefinition.setStreamId(streamId);
+ streamDefinition.setGrouping(groupingDefinition);
+ MockIRichBolt mockIRichBolt = new MockIRichBolt();
+
+ when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+ when(mockContext.getBolt(eq(to))).thenReturn(mockIRichBolt);
+ when(mockDefinition.getStreams()).thenReturn(streams);
+ when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+ when(mockTopologyBuilder.setBolt(eq(to),
+ eq(mockIRichBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+
+ subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+ verify(mockContext).getTopologyDefinition();
+ verify(mockContext).getBolt(eq(to));
+ verify(mockDefinition).parallelismForBolt(eq(to));
+ verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIRichBolt), eq(iRichBoltParallelism));
+ verify(mockBoltDeclarer).shuffleGrouping(eq(from), eq(streamId));
+ verify(mockContext).setStreams(anyMap());
+ verify(mockDefinition).getStreams();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void buildStreams_SpoutToIBasicBolt_FieldsGroupingWithArgs() throws
+ ClassNotFoundException,
+ InvocationTargetException, NoSuchFieldException,
+ InstantiationException, IllegalAccessException {
+ final int iRichBoltParallelism = 1;
+ final String to = "to";
+ final String from = "from";
+ final String streamId = "id";
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setFrom(from);
+ streamDefinition.setTo(to);
+ streamDefinition.setId(streamId);
+ List<StreamDefinition> streams = new ArrayList<>();
+ streams.add(streamDefinition);
+ GroupingDefinition groupingDefinition = new GroupingDefinition();
+ groupingDefinition.setType(GroupingDefinition.Type.FIELDS);
+ List<String> args = new ArrayList<>();
+ args.add("arg1");
+ groupingDefinition.setArgs(args);
+ groupingDefinition.setStreamId(streamId);
+ streamDefinition.setGrouping(groupingDefinition);
+ MockIBasicBolt mockIBasicBolt = new MockIBasicBolt();
+
+ when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+ when(mockContext.getBolt(eq(to))).thenReturn(mockIBasicBolt);
+ when(mockDefinition.getStreams()).thenReturn(streams);
+ when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+ when(mockTopologyBuilder.setBolt(eq(to),
+ eq(mockIBasicBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+
+ subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+ verify(mockContext).getTopologyDefinition();
+ verify(mockContext).getBolt(eq(to));
+ verify(mockDefinition).parallelismForBolt(eq(to));
+ verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIBasicBolt), eq(iRichBoltParallelism));
+ verify(mockBoltDeclarer).fieldsGrouping(eq(from), eq(streamId), any(Fields.class));
+ verify(mockContext).setStreams(anyMap());
+ verify(mockDefinition).getStreams();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ @SuppressWarnings("unchecked")
+ public void buildStreams_SpoutToIBasicBolt_FieldsGroupingWithoutArgs_ExceptionThrown() throws
+ ClassNotFoundException,
+ InvocationTargetException, NoSuchFieldException,
+ InstantiationException, IllegalAccessException {
+ final int iRichBoltParallelism = 1;
+ final String to = "to";
+ final String from = "from";
+ final String streamId = "id";
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setFrom(from);
+ streamDefinition.setTo(to);
+ streamDefinition.setId(streamId);
+ List<StreamDefinition> streams = new ArrayList<>();
+ streams.add(streamDefinition);
+ GroupingDefinition groupingDefinition = new GroupingDefinition();
+ groupingDefinition.setType(GroupingDefinition.Type.FIELDS);
+
+ groupingDefinition.setStreamId(streamId);
+ streamDefinition.setGrouping(groupingDefinition);
+ MockIBasicBolt mockIBasicBolt = new MockIBasicBolt();
+ try {
+ when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+ when(mockContext.getBolt(eq(to))).thenReturn(mockIBasicBolt);
+ when(mockDefinition.getStreams()).thenReturn(streams);
+ when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+ when(mockTopologyBuilder.setBolt(eq(to),
+ eq(mockIBasicBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+
+ subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+ } finally {
+
+ verify(mockContext).getTopologyDefinition();
+ verify(mockContext).getBolt(eq(to));
+ verify(mockDefinition).parallelismForBolt(eq(to));
+ verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIBasicBolt), eq(iRichBoltParallelism));
+ verify(mockDefinition).getStreams();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void buildStreams_SpoutToIWindowedBolt_CustomGrouping() throws ClassNotFoundException,
+ InvocationTargetException, NoSuchFieldException,
+ InstantiationException, IllegalAccessException {
+ final int iRichBoltParallelism = 1;
+ final String to = "to";
+ final String from = "from";
+ final String streamId = "id";
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setFrom(from);
+ streamDefinition.setTo(to);
+ streamDefinition.setId(streamId);
+ List<StreamDefinition> streams = new ArrayList<>();
+ streams.add(streamDefinition);
+ GroupingDefinition groupingDefinition = new GroupingDefinition();
+ groupingDefinition.setType(GroupingDefinition.Type.CUSTOM);
+ MockCustomObjectDefinition mockCustomObjectDefinition = new MockCustomObjectDefinition();
+
+ groupingDefinition.setCustomClass(mockCustomObjectDefinition);
+ List<String> args = new ArrayList<>();
+ args.add("arg1");
+ groupingDefinition.setArgs(args);
+ groupingDefinition.setStreamId(streamId);
+ streamDefinition.setGrouping(groupingDefinition);
+ MockIWindowedBolt mockIWindowedBolt = new MockIWindowedBolt();
+ MockCustomStreamGrouping mockCustomStreamGrouping = new MockCustomStreamGrouping();
+
+ when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+ when(mockContext.getBolt(eq(to))).thenReturn(mockIWindowedBolt);
+ when(mockDefinition.getStreams()).thenReturn(streams);
+ when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+ when(mockTopologyBuilder.setBolt(eq(to),
+ eq(mockIWindowedBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+ when(mockObjectBuilder.buildObject(eq(mockCustomObjectDefinition),
+ eq(mockContext))).thenReturn(mockCustomStreamGrouping);
+
+ subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+ verify(mockContext).getTopologyDefinition();
+ verify(mockContext).getBolt(eq(to));
+ verify(mockDefinition).parallelismForBolt(eq(to));
+ verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIWindowedBolt), eq(iRichBoltParallelism));
+ verify(mockBoltDeclarer).customGrouping(eq(from), eq(streamId), eq(mockCustomStreamGrouping));
+ verify(mockContext).setStreams(anyMap());
+ verify(mockDefinition).getStreams();
+ verify(mockObjectBuilder).buildObject(same(mockCustomObjectDefinition), same(mockContext));
+ }
+
+ private class MockCustomObjectDefinition extends ObjectDefinition {
+
+ }
+
+ @SuppressWarnings("serial")
+ private class MockCustomStreamGrouping implements CustomStreamGrouping {
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
+ List<Integer> targetTasks) {
+
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ return null;
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked", "serial"})
+ private class MockIRichBolt implements IRichBolt {
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+
+ }
+
+ @Override
+ public void execute(Tuple input) {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked", "serial"})
+ private class MockIWindowedBolt implements IWindowedBolt {
+ @Override
+ public void prepare(Map<String, Object> topoConf,
+ TopologyContext context, OutputCollector collector) {
+
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public TimestampExtractor getTimestampExtractor() {
+ return null;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+
+ @SuppressWarnings({"rawtypes", "unchecked", "serial"})
+ public class MockIBasicBolt implements IBasicBolt {
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java b/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java
new file mode 100644
index 0000000..24d226f
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java
@@ -0,0 +1,407 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.parser;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.heron.eco.definition.BeanDefinition;
+import com.twitter.heron.eco.definition.BeanReference;
+import com.twitter.heron.eco.definition.BoltDefinition;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.GroupingDefinition;
+import com.twitter.heron.eco.definition.PropertyDefinition;
+import com.twitter.heron.eco.definition.StreamDefinition;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+/**
+ * Unit tests for {@link EcoParser}
+ */
+public class EcoParserTest {
+
+
+ private static final String BOLT_1 = "bolt-1";
+ private static final String BOLT_2 = "bolt-2";
+ private static final String YAML_NO_CONFIG_STR = "# Licensed to the Apache Software Foundation"
+ + " (ASF) under one\n"
+ + "# or more contributor license agreements. See the NOTICE file\n"
+ + "# distributed with this work for additional information\n"
+ + "# regarding copyright ownership. The ASF licenses this file\n"
+ + "# to you under the Apache License, Version 2.0 (the\n"
+ + "# \"License\"); you may not use this file except in compliance\n"
+ + "# with the License. You may obtain a copy of the License at\n"
+ + "#\n"
+ + "# http://www.apache.org/licenses/LICENSE-2.0\n"
+ + "#\n"
+ + "# Unless required by applicable law or agreed to in writing, software\n"
+ + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+ + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+ + "# See the License for the specific language governing permissions and\n"
+ + "# limitations under the License.\n"
+ + "\n"
+ + "---\n"
+ + "\n"
+ + "# topology definition\n"
+ + "# name to be used when submitting\n"
+ + "name: \"yaml-topology\"\n"
+ + "\n"
+ + "# topology configuration\n"
+ + "# this will be passed to the submitter as a map of config options\n"
+ + "#\n"
+ + "# spout definitions\n"
+ + "spouts:\n"
+ + " - id: \"spout-1\"\n"
+ + " className: \"com.twitter.heron.sample.TestWordSpout\"\n"
+ + " parallelism: 1\n"
+ + "\n"
+ + "# bolt definitions\n"
+ + "bolts:\n"
+ + " - id: \"bolt-1\"\n"
+ + " className: \"com.twitter.heron.sample.TestWordCounter\"\n"
+ + " parallelism: 2\n"
+ + "\n"
+ + " - id: \"bolt-2\"\n"
+ + " className: \"com.twitter.heron.sample.LogInfoBolt\"\n"
+ + " parallelism: 1\n"
+ + "\n"
+ + "#stream definitions\n"
+ + "# stream definitions define connections between spouts and bolts.\n"
+ + "# note that such connections can be cyclical\n"
+ + "streams:\n"
+ + " - name: \"spout-1 --> bolt-1\" # name isn't used (placeholder for logging, UI, etc.)\n"
+ + " id: \"connection-1\"\n"
+ + " from: \"spout-1\"\n"
+ + " to: \"bolt-1\"\n"
+ + " grouping:\n"
+ + " type: FIELDS\n"
+ + " args: [\"word\"]\n"
+ + "\n"
+ + " - name: \"bolt-1 --> bolt2\"\n"
+ + " id: \"connection-2\"\n"
+ + " from: \"bolt-1\"\n"
+ + " to: \"bolt-2\"\n"
+ + " grouping:\n"
+ + " type: SHUFFLE";
+ private static final String YAML_STR = "# Licensed to the Apache Software Foundation"
+ + " (ASF) under one\n"
+ + "# or more contributor license agreements. See the NOTICE file\n"
+ + "# distributed with this work for additional information\n"
+ + "# regarding copyright ownership. The ASF licenses this file\n"
+ + "# to you under the Apache License, Version 2.0 (the\n"
+ + "# \"License\"); you may not use this file except in compliance\n"
+ + "# with the License. You may obtain a copy of the License at\n"
+ + "#\n"
+ + "# http://www.apache.org/licenses/LICENSE-2.0\n"
+ + "#\n"
+ + "# Unless required by applicable law or agreed to in writing, software\n"
+ + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+ + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+ + "# See the License for the specific language governing permissions and\n"
+ + "# limitations under the License.\n"
+ + "\n"
+ + "---\n"
+ + "\n"
+ + "# topology definition\n"
+ + "# name to be used when submitting\n"
+ + "name: \"yaml-topology\"\n"
+ + "\n"
+ + "# topology configuration\n"
+ + "# this will be passed to the submitter as a map of config options\n"
+ + "#\n"
+ + "config:\n"
+ + " topology.workers: 1\n"
+ + "\n"
+ + "# spout definitions\n"
+ + "spouts:\n"
+ + " - id: \"spout-1\"\n"
+ + " className: \"com.twitter.heron.sample.TestWordSpout\"\n"
+ + " parallelism: 1\n"
+ + "\n"
+ + "# bolt definitions\n"
+ + "bolts:\n"
+ + " - id: \"bolt-1\"\n"
+ + " className: \"com.twitter.heron.sample.TestWordCounter\"\n"
+ + " parallelism: 2\n"
+ + "\n"
+ + " - id: \"bolt-2\"\n"
+ + " className: \"com.twitter.heron.sample.LogInfoBolt\"\n"
+ + " parallelism: 1\n"
+ + "\n"
+ + "#stream definitions\n"
+ + "# stream definitions define connections between spouts and bolts.\n"
+ + "# note that such connections can be cyclical\n"
+ + "streams:\n"
+ + " - name: \"spout-1 --> bolt-1\" # name isn't used (placeholder for logging, UI, etc.)\n"
+ + " id: \"connection-1\"\n"
+ + " from: \"spout-1\"\n"
+ + " to: \"bolt-1\"\n"
+ + " grouping:\n"
+ + " type: FIELDS\n"
+ + " args: [\"word\"]\n"
+ + "\n"
+ + " - name: \"bolt-1 --> bolt2\"\n"
+ + " id: \"connection-2\"\n"
+ + " from: \"bolt-1\"\n"
+ + " to: \"bolt-2\"\n"
+ + " grouping:\n"
+ + " type: SHUFFLE";
+
+ private static final String YAML_STR_1 = "# Test ability to wire together shell spouts/bolts\n"
+ + "---\n"
+ + "\n"
+ + "name: \"kafka-topology\"\n"
+ + "\n"
+ + "# Components\n"
+ + "# Components are analagous to Spring beans. They are meant to be used as constructor,\n"
+ + "# property(setter), and builder arguments.\n"
+ + "#\n"
+ + "# for the time being, components must be declared in the order they are referenced\n"
+ + "components:\n"
+ + " - id: \"stringScheme\"\n"
+ + " className: \"org.apache.storm.kafka.StringScheme\"\n"
+ + "\n"
+ + " - id: \"stringMultiScheme\"\n"
+ + " className: \"org.apache.storm.spout.SchemeAsMultiScheme\"\n"
+ + " constructorArgs:\n"
+ + " - ref: \"stringScheme\"\n"
+ + "\n"
+ + " - id: \"zkHosts\"\n"
+ + " className: \"org.apache.storm.kafka.ZkHosts\"\n"
+ + " constructorArgs:\n"
+ + " - \"localhost:2181\"\n"
+ + "\n"
+ + "# Alternative kafka config\n"
+ + "# - id: \"kafkaConfig\"\n"
+ + "# className: \"org.apache.storm.kafka.KafkaConfig\"\n"
+ + "# constructorArgs:\n"
+ + "# # brokerHosts\n"
+ + "# - ref: \"zkHosts\"\n"
+ + "# # topic\n"
+ + "# - \"myKafkaTopic\"\n"
+ + "# # clientId (optional)\n"
+ + "# - \"myKafkaClientId\"\n"
+ + "\n"
+ + " - id: \"spoutConfig\"\n"
+ + " className: \"org.apache.storm.kafka.SpoutConfig\"\n"
+ + " constructorArgs:\n"
+ + " # brokerHosts\n"
+ + " - ref: \"zkHosts\"\n"
+ + " # topic\n"
+ + " - \"myKafkaTopic\"\n"
+ + " # zkRoot\n"
+ + " - \"/kafkaSpout\"\n"
+ + " # id\n"
+ + " - \"myId\"\n"
+ + " properties:\n"
+ + " - name: \"ignoreZkOffsets\"\n"
+ + " value: true\n"
+ + " - name: \"scheme\"\n"
+ + " ref: \"stringMultiScheme\"\n"
+ + "\n"
+ + "\n"
+ + "\n"
+ + "# topology configuration\n"
+ + "# this will be passed to the submitter as a map of config options\n"
+ + "#\n"
+ + "config:\n"
+ + " topology.workers: 1\n"
+ + " # ...\n"
+ + "\n"
+ + "# spout definitions\n"
+ + "spouts:\n"
+ + " - id: \"kafka-spout\"\n"
+ + " className: \"org.apache.storm.kafka.KafkaSpout\"\n"
+ + " constructorArgs:\n"
+ + " - ref: \"spoutConfig\"\n"
+ + "\n"
+ + "# bolt definitions\n"
+ + "bolts:\n"
+ + " - id: \"splitsentence\"\n"
+ + " className: \"org.apache.storm.flux.wrappers.bolts.FluxShellBolt\"\n"
+ + " constructorArgs:\n"
+ + " # command line\n"
+ + " - [\"python\", \"splitsentence.py\"]\n"
+ + " # output fields\n"
+ + " - [\"word\"]\n"
+ + " parallelism: 1\n"
+ + " # ...\n"
+ + "\n"
+ + " - id: \"log\"\n"
+ + " className: \"org.apache.storm.flux.wrappers.bolts.LogInfoBolt\"\n"
+ + " parallelism: 1\n"
+ + " # ...\n"
+ + "\n"
+ + " - id: \"count\"\n"
+ + " className: \"org.apache.storm.testing.TestWordCounter\"\n"
+ + " parallelism: 1\n"
+ + " # ...\n"
+ + "\n"
+ + "#stream definitions\n"
+ + "# stream definitions define connections between spouts and bolts.\n"
+ + "# note that such connections can be cyclical\n"
+ + "# custom stream groupings are also supported\n"
+ + "\n"
+ + "streams:\n"
+ + " - name: \"kafka --> split\" # name isn't used (placeholder for logging, UI, etc.)\n"
+ + " id: \"stream1\"\n"
+ + " from: \"kafka-spout\"\n"
+ + " to: \"splitsentence\"\n"
+ + " grouping:\n"
+ + " type: SHUFFLE\n"
+ + "\n"
+ + " - name: \"split --> count\"\n"
+ + " id: \"stream2\"\n"
+ + " from: \"splitsentence\"\n"
+ + " to: \"count\"\n"
+ + " grouping:\n"
+ + " type: FIELDS\n"
+ + " args: [\"word\"]\n"
+ + "\n"
+ + " - name: \"count --> log\"\n"
+ + " id: \"stream3\"\n"
+ + " from: \"count\"\n"
+ + " to: \"log\"\n"
+ + " grouping:\n"
+ + " type: SHUFFLE";
+ private EcoParser subject;
+
+ @Before
+ public void setUpBeforeEachTestCase() {
+ subject = new EcoParser();
+ }
+
+
+ @Test
+ public void testParseFromInputStream_VerifyComponents_MapsAsExpected() throws Exception {
+
+ InputStream inputStream = new ByteArrayInputStream(YAML_STR_1.getBytes());
+
+ EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream);
+ List<BeanDefinition> components = topologyDefinition.getComponents();
+
+ assertEquals("kafka-topology", topologyDefinition.getName());
+ assertEquals(4, components.size());
+
+ BeanDefinition stringSchemeComponent = components.get(0);
+ assertEquals("stringScheme", stringSchemeComponent.getId());
+ assertEquals("org.apache.storm.kafka.StringScheme", stringSchemeComponent.getClassName());
+
+
+ BeanDefinition stringMultiSchemeComponent = components.get(1);
+ assertEquals("stringMultiScheme", stringMultiSchemeComponent.getId());
+ assertEquals("org.apache.storm.spout.SchemeAsMultiScheme",
+ stringMultiSchemeComponent.getClassName());
+ assertEquals(1, stringMultiSchemeComponent.getConstructorArgs().size());
+ BeanReference multiStringReference =
+ (BeanReference) stringMultiSchemeComponent.getConstructorArgs().get(0);
+ assertEquals("stringScheme", multiStringReference.getId());
+
+ BeanDefinition zkHostsComponent = components.get(2);
+ assertEquals("zkHosts", zkHostsComponent.getId());
+ assertEquals("org.apache.storm.kafka.ZkHosts", zkHostsComponent.getClassName());
+ assertEquals(1, zkHostsComponent.getConstructorArgs().size());
+ assertEquals("localhost:2181", zkHostsComponent.getConstructorArgs().get(0));
+
+ BeanDefinition spoutConfigComponent = components.get(3);
+ List<Object> spoutConstructArgs = spoutConfigComponent.getConstructorArgs();
+ assertEquals("spoutConfig", spoutConfigComponent.getId());
+ assertEquals("org.apache.storm.kafka.SpoutConfig", spoutConfigComponent.getClassName());
+ BeanReference spoutBrokerHostComponent = (BeanReference) spoutConstructArgs.get(0);
+ assertEquals("zkHosts", spoutBrokerHostComponent.getId());
+ assertEquals("myKafkaTopic", spoutConstructArgs.get(1));
+ assertEquals("/kafkaSpout", spoutConstructArgs.get(2));
+ List<PropertyDefinition> properties = spoutConfigComponent.getProperties();
+ assertEquals("ignoreZkOffsets", properties.get(0).getName());
+ assertEquals(true, properties.get(0).getValue());
+ assertEquals("scheme", properties.get(1).getName());
+ assertEquals(true, properties.get(1).isReference());
+ assertEquals("stringMultiScheme", properties.get(1).getRef());
+ }
+
+ @Test
+ public void testParseFromInputStream_VerifyAllButComponents_MapsAsExpected() throws Exception {
+
+ InputStream inputStream = new ByteArrayInputStream(YAML_STR.getBytes());
+
+ EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream);
+
+ assertEquals("yaml-topology", topologyDefinition.getName());
+ assertEquals(1, topologyDefinition.getConfig().size());
+ assertEquals(1, topologyDefinition.getConfig().get("topology.workers"));
+
+ BoltDefinition bolt1 = topologyDefinition.getBolt(BOLT_1);
+ assertNotNull(bolt1);
+ assertEquals(2, bolt1.getParallelism());
+ assertEquals("com.twitter.heron.sample.TestWordCounter", bolt1.getClassName());
+ assertEquals(BOLT_1, bolt1.getId());
+
+
+ BoltDefinition bolt2 = topologyDefinition.getBolt(BOLT_2);
+ assertEquals(1, bolt2.getParallelism());
+ assertEquals("com.twitter.heron.sample.LogInfoBolt", bolt2.getClassName());
+ assertEquals(BOLT_2, bolt2.getId());
+
+ List<StreamDefinition> streamDefinitions = topologyDefinition.getStreams();
+ StreamDefinition streamDefinitionOne = streamDefinitions.get(0);
+ GroupingDefinition groupingDefinitionOne = streamDefinitionOne.getGrouping();
+ StreamDefinition streamDefinitionTwo = streamDefinitions.get(1);
+ GroupingDefinition groupingDefinitionTwo = streamDefinitionTwo.getGrouping();
+
+ assertEquals(2, streamDefinitions.size());
+
+ assertEquals(BOLT_1, streamDefinitionOne.getTo());
+ assertEquals("spout-1", streamDefinitionOne.getFrom());
+ assertEquals(GroupingDefinition.Type.FIELDS, groupingDefinitionOne.getType());
+ assertEquals(1, groupingDefinitionOne.getArgs().size());
+ assertEquals("word", groupingDefinitionOne.getArgs().get(0));
+ assertEquals("connection-1", streamDefinitionOne.getId());
+
+ assertEquals(BOLT_2, streamDefinitionTwo.getTo());
+ assertEquals("bolt-1", streamDefinitionTwo.getFrom());
+ assertEquals(GroupingDefinition.Type.SHUFFLE, groupingDefinitionTwo.getType());
+ assertEquals("connection-2", streamDefinitionTwo.getId());
+ assertNull(groupingDefinitionTwo.getArgs());
+
+ }
+
+ @Test
+ public void testPartFromInputStream_NoConfigSpecified_ConfigMapIsEmpty() throws Exception {
+ InputStream inputStream = new ByteArrayInputStream(YAML_NO_CONFIG_STR.getBytes());
+
+ EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream);
+
+ assertNotNull(topologyDefinition.getConfig());
+ assertEquals(0, topologyDefinition.getConfig().size());
+ }
+
+ @Test(expected = Exception.class)
+ public void testParseFromInputStream_StreamIsNull_ExceptionThrown() throws Exception {
+ InputStream inputStream = null;
+ EcoTopologyDefinition ecoTopologyDefinition = null;
+
+ try {
+ ecoTopologyDefinition = subject.parseFromInputStream(inputStream);
+ } finally {
+ assertNull(ecoTopologyDefinition);
+ }
+ }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java b/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java
new file mode 100644
index 0000000..3291d49
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java
@@ -0,0 +1,55 @@
+// Copyright 2018 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.eco.submit;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormSubmitter.class)
+public class EcoSubmitterTest {
+
+ private EcoSubmitter subject;
+
+ @Before
+ public void setUpForEachTestCase() {
+ subject = new EcoSubmitter();
+ }
+
+ @Test
+ public void submitTopology_AllGood_BehavesAsExpected()
+ throws Exception {
+ Config config = new Config();
+ StormTopology topology = new StormTopology();
+ PowerMockito.spy(StormSubmitter.class);
+ PowerMockito.doNothing().when(StormSubmitter.class, "submitTopology",
+ any(String.class), any(Config.class), any(StormTopology.class));
+
+ subject.submitTopology("name", config, topology);
+ PowerMockito.verifyStatic(times(1));
+ StormSubmitter.submitTopology(anyString(), any(Config.class), any(StormTopology.class));
+
+ }
+}
diff --git a/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java b/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java
index 4c54dc3..f767a0e 100644
--- a/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java
+++ b/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java
@@ -55,7 +55,7 @@
private static final String EXCEPTION_FIRST_TIME = "firstTime";
private static final String EXCEPTION_LOGGING = "logging";
private static final String RECORD_SOURCE = "source";
- private static final String RECORD_CONTEXT = "context";
+ private static final String RECORD_CONTEXT = "ecoExecutionContext";
private volatile int processRecordInvoked = 0;
private volatile int flushInvoked = 0;
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java b/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java
index dc696d1..36ad671 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java
@@ -25,7 +25,7 @@
public class MetricsRecordTest {
private static final int N = 100;
private static final String SOURCE = "source";
- private static final String CONTEXT = "context";
+ private static final String CONTEXT = "ecoExecutionContext";
private final List<MetricsRecord> records = new ArrayList<MetricsRecord>();
@Before
diff --git a/scripts/get_all_heron_paths.sh b/scripts/get_all_heron_paths.sh
index 5ea0e68..ed40c02 100755
--- a/scripts/get_all_heron_paths.sh
+++ b/scripts/get_all_heron_paths.sh
@@ -67,7 +67,7 @@
}
function get_heron_java_paths() {
- local java_paths=$(find {heron,heron/tools,tools,integration_test,contrib} -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed "s|/java/src/.*$|/java/src|" | sed "s|/tests/java/.*$|/tests/java|" | sort -u | fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
+ local java_paths=$(find {heron,heron/tools,tools,integration_test,storm-compatibility,contrib} -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed "s|/java/src/.*$|/java/src|" | sed "s|/tests/java/.*$|/tests/java|" | sort -u | fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
if [ "$(uname -s | tr 'A-Z' 'a-z')" != "darwin" ]; then
java_paths=$(echo "${java_paths}" | fgrep -v "/objc_tools/")
fi