Joshfischer/eco config (#2658)

* initial addition of directory structure

* clean up

* adding base config for build file \
fixing linting issues

* saving progress.  Still need to fix missing class definition on ParseException

* fixing shaded dependency issue

* starting experimentation on how to approach loading config from yaml file

* experimenting with snake yaml to extract topology defintion

* adding start of topology and component definitions

* refactor

* successfully populating ecoTopologyDefinition from yaml

* clean up

* clean up

* fixed versioning error

* saving progress, still working through building topology from yaml

* saving progress

* cleaning up styles

* checking default config values

* adding test for customized config map

* transitioning to support heron api

* removing last of streamlet api from eco

* making a pivot to match storm flux behavior better

* saving before chaos

* saving progress, running into issue with verifying the toplogy

* adding some logging for debugging

* printing topology definition.  Adding tests

* saving with failing test, still working through investigation

* back to working

* saving progress

* finishing eco parser test

* handling exception

* package change

* package change

* adding missed id

* MVP of ECO at this commit

* fixing styles, pulling topology name from topology definition file

* fixing spelling

* clean up

* clean up

* refatoring eco builder package

* start to removing static calls

* removing static calls

* still working through linting errors

* fixing lint errors

* fixing setter

* adding test

* start to verfiying bean references

* verifying components

* fixing tests

* worked passed constructor argument issue.  Now need to deal with backtypes from Storm

* saving before restructure

* building kafka topology

* fixing imports for test

* fixing folder structure

* refactor for testability

* removing more statics

* cleaning up eco builders

* confirmation config builder maps as expected

* finishing configBuilder test

* cleaning up BoltBuilder

* verfiying component builder

* replacing java logger

* adding tests

* adding verification of building implementations of IRichBolts

* clean up

* adding mocks for testing

* fixing classNotFound errors

* coverage on all of stream bulder

* need to refactor ObjectBuilder

* moving static call

* adding verifcation to test

* clean up

* clean up

* verifying path of building objects with args

* covering other path

* starting on builder utility

* verifying className

* adding test

* adding test

* clean up

* adding test

* clean up

* clean up

* fixing checkstyle errors

* fixing linting issues

* fixing linting issues

* fixing linting issues

* fixing linting issues

* linting issues

* fixing checkstyle

* fixing checkstyle

* saving progress

* adding simple wordcount eco example

* adding examples

* checkstyles
diff --git a/examples/src/java/BUILD b/examples/src/java/BUILD
index 3994603..4a4772f 100644
--- a/examples/src/java/BUILD
+++ b/examples/src/java/BUILD
@@ -37,3 +37,24 @@
     outs = ["heron-streamlet-examples.jar"],
     cmd  = "cp $< $@",
 )
+
+
+java_binary(
+    name='eco-examples-unshaded',
+    srcs = glob(["com/twitter/heron/examples/eco/**/*.java"]),
+    deps = [
+        "//heron/api/src/java:api-java-low-level",
+        "//heron/api/src/java:api-java",
+        "//heron/common/src/java:basics-java",
+         "//heron/eco/src/java:eco-core",
+        "//storm-compatibility/src/java:storm-compatibility-java",
+    ],
+    create_executable = 0,
+)
+
+genrule(
+    name = 'heron-eco-examples',
+    srcs = [":eco-examples-unshaded_deploy.jar"],
+    outs = ["heron-eco-examples.jar"],
+    cmd  = "cp $< $@",
+)
\ No newline at end of file
diff --git a/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java b/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
new file mode 100644
index 0000000..3271178
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java
@@ -0,0 +1,41 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+
+import java.util.logging.Logger;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Simple bolt that does nothing other than LOG.info() every tuple received.
+ *
+ */
+@SuppressWarnings("serial")
+public class LogInfoBolt extends BaseBasicBolt {
+  private static final Logger LOG = Logger.getLogger(LogInfoBolt.class.getName());
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+    LOG.info("{ }" + tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java b/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
new file mode 100644
index 0000000..00b5f64
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java
@@ -0,0 +1,61 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+import static com.twitter.heron.api.utils.Utils.tuple;
+@SuppressWarnings({"serial", "rawtypes"})
+public class TestNameCounter extends BaseBasicBolt {
+
+  private Map<String, Integer> counts;
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext) {
+    counts = new HashMap<>();
+  }
+
+
+  protected String getTupleValue(Tuple t, int idx) {
+    return (String) t.getValues().get(idx);
+  }
+
+  public void execute(Tuple input, BasicOutputCollector collector) {
+    String word = getTupleValue(input, 0);
+    int count = 0;
+    if (counts.containsKey(word)) {
+      count = counts.get(word);
+    }
+    count++;
+    counts.put(word, count);
+    collector.emit(tuple(word, count));
+  }
+
+  public void cleanup() {
+
+  }
+
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("name", "count"));
+  }
+
+}
diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java b/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
new file mode 100644
index 0000000..4e7e8a5
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java
@@ -0,0 +1,81 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.storm.Config;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+@SuppressWarnings({"serial", "HiddenField"})
+public class TestNameSpout extends BaseRichSpout {
+  private boolean isdistributed;
+  private SpoutOutputCollector collector;
+
+  public TestNameSpout() {
+    this(true);
+  }
+
+  public TestNameSpout(boolean isDistributed) {
+    isdistributed = isDistributed;
+  }
+
+  public void open(Map<String, Object> conf, TopologyContext context,
+                   SpoutOutputCollector collector) {
+    this.collector = collector;
+  }
+
+  public void close() {
+
+  }
+
+  public void nextTuple() {
+    Utils.sleep(100);
+    final String[] words = new String[] {"marge", "homer", "bart", "simpson", "lisa"};
+    final Random rand = new Random();
+    final String word = words[rand.nextInt(words.length)];
+    collector.emit(new Values(word));
+  }
+
+  public void ack(Object msgId) {
+
+  }
+
+  public void fail(Object msgId) {
+
+  }
+
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("name"));
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    if (!isdistributed) {
+      Map<String, Object> ret = new HashMap<String, Object>();
+      ret.put(Config.TOPOLOGY_WORKERS, 1);
+      return ret;
+    } else {
+      return null;
+    }
+  }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java b/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
new file mode 100644
index 0000000..f9cb1d6
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java
@@ -0,0 +1,33 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+@SuppressWarnings("serial")
+public class TestPrintBolt extends BaseBasicBolt {
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    System.out.println(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer ofd) {
+  }
+
+}
diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java b/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
new file mode 100644
index 0000000..ef6046c
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java
@@ -0,0 +1,46 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.examples.eco;
+
+import java.util.Map;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+
+@SuppressWarnings({"serial", "HiddenField"})
+public class TestWindowBolt extends BaseWindowedBolt {
+  private OutputCollector collector;
+
+
+  @Override
+  public void prepare(Map<String, Object> topoConf, TopologyContext context,
+                      OutputCollector collector) {
+    this.collector = collector;
+  }
+
+  @Override
+  public void execute(TupleWindow inputWindow) {
+    collector.emit(new Values(inputWindow.get().size()));
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("count"));
+  }
+}
diff --git a/examples/src/resources/simple_windowing.yaml b/examples/src/resources/simple_windowing.yaml
new file mode 100644
index 0000000..e860b64
--- /dev/null
+++ b/examples/src/resources/simple_windowing.yaml
@@ -0,0 +1,67 @@
+#  Copyright 2017 Twitter. All rights reserved.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#  http:#www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+---
+
+name: "sliding-window-topology"
+
+components:
+  - id: "windowLength"
+    className: "org.apache.storm.topology.base.BaseWindowedBolt$Count"
+    constructorArgs:
+      - 5
+  - id: "slidingInterval"
+    className: "org.apache.storm.topology.base.BaseWindowedBolt$Count"
+    constructorArgs:
+      - 3
+
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "spout-1"
+    className: "com.twitter.heron.examples.eco.TestNameSpout"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "bolt-1"
+    className: "com.twitter.heron.examples.eco.TestWindowBolt"
+    configMethods:
+      - name: "withWindow"
+        args: [ref: "windowLength", ref: "slidingInterval"]
+    parallelism: 1
+  - id: "bolt-2"
+    className: "com.twitter.heron.examples.eco.TestPrintBolt"
+    parallelism: 1
+
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+#    id: "connection-1"
+    from: "spout-1"
+    to: "bolt-1"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+  - name: "bolt-1 --> bolt-2" # name isn't used (placeholder for logging, UI, etc.)
+#    id: "connection-1"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: SHUFFLE
\ No newline at end of file
diff --git a/examples/src/resources/simple_wordcount.yaml b/examples/src/resources/simple_wordcount.yaml
new file mode 100644
index 0000000..d28eec2
--- /dev/null
+++ b/examples/src/resources/simple_wordcount.yaml
@@ -0,0 +1,60 @@
+#  Copyright 2017 Twitter. All rights reserved.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#  http:#www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "simple-wordcount-topology"
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "spout-1"
+    className: "com.twitter.heron.examples.eco.TestNameSpout"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "bolt-1"
+    className: "com.twitter.heron.examples.eco.TestNameCounter"
+    parallelism: 1
+
+  - id: "bolt-2"
+    className: "com.twitter.heron.examples.eco.LogInfoBolt"
+    parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+    id: "connection-1"
+    from: "spout-1"
+    to: "bolt-1"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "bolt-1 --> bolt2"
+    id: "connection-2"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: SHUFFLE
\ No newline at end of file
diff --git a/heron/eco/src/java/BUILD b/heron/eco/src/java/BUILD
new file mode 100644
index 0000000..ec26955
--- /dev/null
+++ b/heron/eco/src/java/BUILD
@@ -0,0 +1,34 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+eco_deps = [
+"@commons_cli_commons_cli//jar",
+"@org_yaml_snakeyaml//jar",
+"//third_party/java:logging",
+"//storm-compatibility/src/java:storm-compatibility-java",
+"//heron/api/src/java:api-java-low-level",
+"//heron/common/src/java:basics-java",
+]
+
+
+java_binary(
+    name="eco-core-shaded",
+    srcs = glob(["com/twitter/heron/eco/**/*.java"]),
+    deps = eco_deps,
+    create_executable = 0,
+
+)
+
+java_library(
+    name = "eco-java",
+    srcs = glob(["com/twitter/heron/eco/**/*.java"]),
+    deps = eco_deps
+)
+
+genrule(
+    name = "eco-core",
+    srcs = [":eco-core-shaded_deploy.jar"],
+    outs = ["eco-core.jar"],
+    cmd  = "cp $< $@",
+)
diff --git a/heron/eco/src/java/com/twitter/heron/eco/Eco.java b/heron/eco/src/java/com/twitter/heron/eco/Eco.java
new file mode 100644
index 0000000..cc30489
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/Eco.java
@@ -0,0 +1,158 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.logging.Logger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.builder.BoltBuilder;
+import com.twitter.heron.eco.builder.BuilderUtility;
+import com.twitter.heron.eco.builder.ComponentBuilder;
+import com.twitter.heron.eco.builder.ConfigBuilder;
+import com.twitter.heron.eco.builder.EcoBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+import com.twitter.heron.eco.builder.SpoutBuilder;
+import com.twitter.heron.eco.builder.StreamBuilder;
+import com.twitter.heron.eco.definition.BoltDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.SpoutDefinition;
+import com.twitter.heron.eco.definition.StreamDefinition;
+import com.twitter.heron.eco.parser.EcoParser;
+import com.twitter.heron.eco.submit.EcoSubmitter;
+
+
+public class Eco {
+
+  private static final Logger LOG = Logger.getLogger(Eco.class.getName());
+
+  private EcoBuilder ecoBuilder;
+  private EcoParser ecoParser;
+  private EcoSubmitter ecoSubmitter;
+
+  public Eco(EcoBuilder ecoBuilder, EcoParser ecoParser, EcoSubmitter ecoSubmitter) {
+    this.ecoBuilder = ecoBuilder;
+    this.ecoParser = ecoParser;
+    this.ecoSubmitter = ecoSubmitter;
+  }
+
+  public void submit(FileInputStream fileInputStream) throws Exception {
+    EcoTopologyDefinition topologyDefinition = ecoParser.parseFromInputStream(fileInputStream);
+
+    String topologyName = topologyDefinition.getName();
+
+    Config topologyConfig = ecoBuilder
+        .buildConfig(topologyDefinition);
+
+    EcoExecutionContext executionContext
+        = new EcoExecutionContext(topologyDefinition, topologyConfig);
+
+    printTopologyInfo(executionContext);
+
+    ObjectBuilder objectBuilder = new ObjectBuilder();
+    objectBuilder.setBuilderUtility(new BuilderUtility());
+    TopologyBuilder builder = ecoBuilder
+        .buildTopologyBuilder(executionContext, objectBuilder);
+
+    ecoSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = constructOptions();
+
+    CommandLineParser parser = new DefaultParser();
+
+    CommandLine cmd;
+    try {
+      cmd = parser.parse(options, args);
+    } catch (ParseException e) {
+      throw new RuntimeException("Error parsing command line options: ", e);
+    }
+
+    FileInputStream fin = new FileInputStream(new File(cmd.getOptionValue("eco-config-file")));
+
+    Eco eco = new Eco(
+        new EcoBuilder(
+            new SpoutBuilder(),
+            new BoltBuilder(),
+            new StreamBuilder(),
+            new ComponentBuilder(),
+            new ConfigBuilder()),
+        new EcoParser(),
+        new EcoSubmitter());
+
+    eco.submit(fin);
+  }
+
+  private static Options constructOptions() {
+    Options options = new Options();
+    Option ecoConfig = Option.builder("eco")
+        .desc("Yaml config file for specifying topology definitions")
+        .longOpt("eco-config-file")
+        .hasArgs()
+        .argName("eco-config-file")
+        .required()
+        .build();
+    options.addOption(ecoConfig);
+    return options;
+  }
+
+  // construct command line help options
+  //TODO: (joshfischer) integrate with existing system somehow
+  private static Options constructHelpOptions() {
+    Options options = new Options();
+    Option help = Option.builder("h")
+        .desc("List all options and their description")
+        .longOpt("help")
+        .build();
+
+    options.addOption(help);
+    return options;
+  }
+
+  static void printTopologyInfo(EcoExecutionContext ctx) {
+    EcoTopologyDefinition t = ctx.getTopologyDefinition();
+
+    LOG.info("---------- TOPOLOGY DETAILS ----------");
+
+    LOG.info(String.format("Topology Name: %s", t.getName()));
+    LOG.info("--------------- SPOUTS ---------------");
+    for (SpoutDefinition s : t.getSpouts()) {
+      LOG.info(String.format("%s [%d] (%s)", s.getId(), s.getParallelism(), s.getClassName()));
+    }
+    LOG.info("---------------- BOLTS ---------------");
+    for (BoltDefinition b : t.getBolts()) {
+      LOG.info(String.format("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName()));
+    }
+
+    LOG.info("--------------- STREAMS ---------------");
+    for (StreamDefinition sd : t.getStreams()) {
+      LOG.info(String.format("%s --%s--> %s",
+          sd.getFrom(),
+          sd.getGrouping().getType(),
+          sd.getTo()));
+    }
+    LOG.info("--------------------------------------");
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java
new file mode 100644
index 0000000..6af6ac9
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java
@@ -0,0 +1,35 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+
+public class BoltBuilder {
+
+  protected void buildBolts(EcoExecutionContext executionContext,
+                            ObjectBuilder objectBuilder)
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException,
+      NoSuchFieldException, InvocationTargetException {
+    EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition();
+
+    for (ObjectDefinition def: topologyDefinition.getBolts()) {
+      Object obj = objectBuilder.buildObject(def, executionContext);
+      executionContext.addBolt(def.getId(), obj);
+    }
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java b/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java
new file mode 100644
index 0000000..a0ed526
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java
@@ -0,0 +1,116 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.heron.eco.definition.BeanListReference;
+import com.twitter.heron.eco.definition.BeanReference;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+import com.twitter.heron.eco.definition.PropertyDefinition;
+
+public class BuilderUtility {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BuilderUtility.class);
+
+  @SuppressWarnings("rawtypes")
+  protected List<Object> resolveReferences(List<Object> args, EcoExecutionContext context) {
+    LOG.debug("Checking arguments for references.");
+    List<Object> cArgs = new ArrayList<>();
+
+    // resolve references
+    for (Object arg : args) {
+      if (arg instanceof BeanReference) {
+        LOG.debug("BeanReference: " + ((BeanReference) arg).getId());
+        cArgs.add(context.getComponent(((BeanReference) arg).getId()));
+      } else if (arg instanceof BeanListReference) {
+        List<Object> components = new ArrayList<>();
+        BeanListReference ref = (BeanListReference) arg;
+        for (String id : ref.getIds()) {
+          components.add(context.getComponent(id));
+        }
+
+        LOG.debug("BeanListReference resolved as {}" + components);
+        cArgs.add(components);
+      } else {
+        LOG.debug("Unknown:" + arg.toString());
+        cArgs.add(arg);
+      }
+    }
+    return cArgs;
+  }
+
+  @SuppressWarnings("rawtypes")
+  protected void applyProperties(ObjectDefinition bean, Object instance,
+                                      EcoExecutionContext context) throws
+      IllegalAccessException, InvocationTargetException, NoSuchFieldException {
+    List<PropertyDefinition> props = bean.getProperties();
+    Class clazz = instance.getClass();
+    if (props != null) {
+      for (PropertyDefinition prop : props) {
+        Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
+        Method setter = findSetter(clazz, prop.getName());
+        if (setter != null) {
+          LOG.info("found setter, attempting with: " + instance.getClass() + "  " + value);
+          // invoke setter
+          setter.invoke(instance, new Object[]{value});
+        } else {
+          // look for a public instance variable
+          LOG.debug("no setter found. Looking for a public instance variable...");
+          Field field = findPublicField(clazz, prop.getName());
+          if (field != null) {
+            field.set(instance, value);
+          }
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  protected Field findPublicField(Class clazz, String property)
+      throws NoSuchFieldException {
+    Field field = clazz.getField(property);
+    return field;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private Method findSetter(Class clazz, String property) {
+    String setterName = toSetterName(property);
+    Method retval = null;
+    Method[] methods = clazz.getMethods();
+    for (Method method : methods) {
+      if (setterName.equals(method.getName())) {
+        LOG.debug("Found setter method: " + method.getName());
+        retval = method;
+      }
+    }
+    return retval;
+  }
+
+  protected String toSetterName(String name) {
+    return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length());
+  }
+
+  protected Class<?> classForName(String className) throws ClassNotFoundException {
+    return Class.forName(className);
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java
new file mode 100644
index 0000000..9fe6023
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java
@@ -0,0 +1,36 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+
+import com.twitter.heron.eco.definition.BeanDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+
+public class ComponentBuilder {
+  protected void buildComponents(EcoExecutionContext context, ObjectBuilder objectBuilder)
+      throws ClassNotFoundException,
+      IllegalAccessException, InstantiationException,
+      NoSuchFieldException, InvocationTargetException {
+    List<BeanDefinition> componentDefinitions = context.getTopologyDefinition().getComponents();
+
+    if (componentDefinitions != null) {
+      for (BeanDefinition bean : componentDefinitions) {
+        Object obj = objectBuilder.buildObject(bean, context);
+        context.addComponent(bean.getId(), obj);
+      }
+    }
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java
new file mode 100644
index 0000000..a6b758b
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java
@@ -0,0 +1,32 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.util.Map;
+
+import org.apache.storm.Config;
+
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+public class ConfigBuilder {
+  protected Config buildConfig(EcoTopologyDefinition topologyDefinition) {
+
+    Map<String, Object> configMap = topologyDefinition.getConfig();
+    Config config = new Config();
+    for (Map.Entry<String, Object> entry: configMap.entrySet()) {
+      config.put(entry.getKey(), entry.getValue());
+    }
+    return config;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java
new file mode 100644
index 0000000..c448f72
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java
@@ -0,0 +1,66 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+
+public class EcoBuilder {
+
+  private SpoutBuilder spoutBuilder;
+
+  private BoltBuilder boltBuilder;
+
+  private StreamBuilder streamBuilder;
+
+  private ComponentBuilder componentBuilder;
+
+  private ConfigBuilder configBuilder;
+
+  public EcoBuilder(SpoutBuilder spoutBuilder, BoltBuilder boltBuilder,
+                    StreamBuilder streamBuilder, ComponentBuilder componentBuilder,
+                    ConfigBuilder configBuilder) {
+    this.spoutBuilder = spoutBuilder;
+    this.boltBuilder = boltBuilder;
+    this.streamBuilder = streamBuilder;
+    this.componentBuilder = componentBuilder;
+    this.configBuilder = configBuilder;
+  }
+
+  public TopologyBuilder buildTopologyBuilder(EcoExecutionContext executionContext,
+                                              ObjectBuilder objectBuilder)
+      throws InstantiationException, IllegalAccessException,
+      ClassNotFoundException,
+      NoSuchFieldException, InvocationTargetException {
+
+    TopologyBuilder builder = new TopologyBuilder();
+    componentBuilder.buildComponents(executionContext, objectBuilder);
+    spoutBuilder.buildSpouts(executionContext, builder, objectBuilder);
+    boltBuilder.buildBolts(executionContext, objectBuilder);
+    streamBuilder.buildStreams(executionContext, builder, objectBuilder);
+
+    return builder;
+  }
+
+  public Config buildConfig(EcoTopologyDefinition topologyDefinition) {
+    return this.configBuilder.buildConfig(topologyDefinition);
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java
new file mode 100644
index 0000000..5f2f441
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java
@@ -0,0 +1,311 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.heron.eco.definition.ConfigurationMethodDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+
+public class ObjectBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(ObjectBuilder.class);
+
+  private BuilderUtility builderUtility;
+
+  public void setBuilderUtility(BuilderUtility builderUtility) {
+    this.builderUtility = builderUtility;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Object buildObject(ObjectDefinition def, EcoExecutionContext context)
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+      InvocationTargetException, NoSuchFieldException {
+    Class clazz = builderUtility.classForName(def.getClassName());
+
+    Object obj;
+    if (def.hasConstructorArgs()) {
+      LOG.debug("Found constructor arguments in definition ");
+      List<Object> cArgs = def.getConstructorArgs();
+
+      if (def.hasReferences()) {
+        LOG.debug("The definition has references");
+        cArgs = builderUtility.resolveReferences(cArgs, context);
+      } else {
+        LOG.debug("The definition does not have references");
+      }
+      LOG.debug("finding compatible constructor for : " + clazz.getName());
+      Constructor con = findCompatibleConstructor(cArgs, clazz);
+      if (con != null) {
+        LOG.debug("Found something seemingly compatible, attempting invocation...");
+        obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes()));
+      } else {
+        String msg = String
+            .format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
+            clazz.getName(),
+            cArgs);
+        throw new IllegalArgumentException(msg);
+      }
+    } else {
+      obj = clazz.newInstance();
+    }
+    builderUtility.applyProperties(def, obj, context);
+    invokeConfigMethods(def, obj, context);
+    return obj;
+  }
+
+  @SuppressWarnings("rawtypes")
+  protected Constructor findCompatibleConstructor(List<Object> args, Class target) {
+    Constructor retval = null;
+    int eligibleCount = 0;
+
+    LOG.debug("Target class: " + target.getName() + ", constructor args: " + args);
+    Constructor[] cons = target.getDeclaredConstructors();
+
+    for (Constructor con : cons) {
+      Class[] paramClasses = con.getParameterTypes();
+
+      if (paramClasses.length == args.size()) {
+        LOG.debug("found constructor with same number of args..");
+        boolean invokable = canInvokeWithArgs(args, con.getParameterTypes());
+        if (invokable) {
+          retval = con;
+          eligibleCount++;
+        }
+        LOG.debug("** invokable --> {}" + invokable);
+      } else {
+        LOG.debug("Skipping constructor with wrong number of arguments.");
+      }
+    }
+    if (eligibleCount > 1) {
+      LOG.error("Found multiple invokable constructors for class: "
+          + target + ", given arguments " + args + ". Using the last one found.");
+    }
+    return retval;
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  protected boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) {
+    if (parameterTypes.length != args.size()) {
+      LOG.warn("parameter types were the wrong size");
+      return false;
+    }
+
+    for (int i = 0; i < args.size(); i++) {
+      Object obj = args.get(i);
+      if (obj == null) {
+        throw new IllegalArgumentException("argument shouldn't be null - index: " + i);
+      }
+      Class paramType = parameterTypes[i];
+      Class objectType = obj.getClass();
+      LOG.debug("Comparing parameter class " + paramType + " to object class "
+          + objectType + "to see if assignment is possible.");
+      if (paramType.equals(objectType)) {
+        LOG.debug("Yes, they are the same class.");
+      } else if (paramType.isAssignableFrom(objectType)) {
+        LOG.debug("Yes, assignment is possible.");
+      } else if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)) {
+        LOG.debug("Yes, assignment is possible.");
+      } else if (isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)) {
+        LOG.debug("Yes, assignment is possible.");
+      } else if (paramType.isEnum() && objectType.equals(String.class)) {
+        LOG.debug("Yes, will convert a String to enum");
+      } else if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
+        LOG.debug("Assignment is possible if we convert a List to an array.");
+        LOG.debug("Array Type: " + paramType.getComponentType() + ", List type: "
+            + ((List) obj).get(0).getClass());
+      } else {
+        LOG.debug("returning false");
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @SuppressWarnings("rawtypes")
+  protected boolean isPrimitiveNumber(Class clazz) {
+    return clazz.isPrimitive() && !clazz.equals(boolean.class);
+  }
+
+  @SuppressWarnings("rawtypes")
+  protected boolean isPrimitiveBoolean(Class clazz) {
+    return clazz.isPrimitive() && clazz.equals(boolean.class);
+  }
+
+  @SuppressWarnings("rawtypes")
+  public void invokeConfigMethods(ObjectDefinition bean,
+                                         Object instance, EcoExecutionContext context)
+      throws InvocationTargetException, IllegalAccessException {
+
+    List<ConfigurationMethodDefinition> methodDefs = bean.getConfigMethods();
+    if (methodDefs == null || methodDefs.size() == 0) {
+      return;
+    }
+    Class clazz = instance.getClass();
+    for (ConfigurationMethodDefinition methodDef : methodDefs) {
+      List<Object> args = methodDef.getArgs();
+      if (args == null) {
+        args = new ArrayList<Object>();
+      }
+      if (methodDef.hasReferences()) {
+        args = builderUtility.resolveReferences(args, context);
+      }
+      String methodName = methodDef.getName();
+      LOG.debug("method name: " + methodName);
+      Method method = findCompatibleMethod(args, clazz, methodName);
+      if (method != null) {
+        Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
+        method.invoke(instance, methodArgs);
+      } else {
+        String msg = String
+            .format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
+            new Object[]{methodName, clazz.getName(), args});
+        throw new IllegalArgumentException(msg);
+      }
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private Method findCompatibleMethod(List<Object> args, Class target, String methodName) {
+    Method retval = null;
+    int eligibleCount = 0;
+    LOG.debug("Target class: " + target.getName() + ",  methodName: "
+        + methodName + ", args: " + args);
+    Method[] methods = target.getMethods();
+    LOG.debug("methods count: " + methods.length);
+    for (Method method : methods) {
+      Class[] paramClasses = method.getParameterTypes();
+      if (paramClasses.length == args.size() && method.getName().equals(methodName)) {
+        LOG.debug("found constructor with same number of args..");
+        boolean invokable = false;
+        if (args.size() == 0) {
+          // it's a method with zero args
+          invokable = true;
+        } else {
+          invokable = canInvokeWithArgs(args, method.getParameterTypes());
+        }
+        if (invokable) {
+          retval = method;
+          eligibleCount++;
+        }
+        LOG.debug("** invokable --> " + invokable);
+      } else {
+        LOG.debug("Skipping method with wrong number of arguments.");
+      }
+    }
+    if (eligibleCount > 1) {
+      LOG.warn("Found multiple invokable methods for class, method, given arguments {} "
+          + new Object[]{target, methodName, args});
+    }
+    return retval;
+  }
+
+
+
+  /**
+   * Given a java.util.List of contructor/method arguments, and a list of parameter types,
+   * attempt to convert the
+   * list to an java.lang.Object array that can be used to invoke the constructor.
+   * If an argument needs
+   * to be coerced from a List to an Array, do so.
+   */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
+//        Class[] parameterTypes = constructor.getParameterTypes();
+    if (parameterTypes.length != args.size()) {
+      throw new IllegalArgumentException("Contructor parameter count does not "
+          + "egual argument size.");
+    }
+    Object[] constructorParams = new Object[args.size()];
+
+    // loop through the arguments, if we hit a list that has to be convered to an array,
+    // perform the conversion
+    for (int i = 0; i < args.size(); i++) {
+      Object obj = args.get(i);
+      Class paramType = parameterTypes[i];
+      Class objectType = obj.getClass();
+      LOG.debug("Comparing parameter class " + paramType.getName() + " to object class "
+          +  objectType.getName() + " to see if assignment is possible.");
+      if (paramType.equals(objectType)) {
+        LOG.debug("They are the same class.");
+        constructorParams[i] = args.get(i);
+        continue;
+      }
+      if (paramType.isAssignableFrom(objectType)) {
+        LOG.debug("Assignment is possible.");
+        constructorParams[i] = args.get(i);
+        continue;
+      }
+      if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)) {
+        LOG.debug("Its a primitive boolean.");
+        Boolean bool = (Boolean) args.get(i);
+        constructorParams[i] = bool.booleanValue();
+        continue;
+      }
+      if (isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)) {
+        LOG.debug("Its a primitive number.");
+        Number num = (Number) args.get(i);
+        if (paramType == Float.TYPE) {
+          constructorParams[i] = num.floatValue();
+        } else if (paramType == Double.TYPE) {
+          constructorParams[i] = num.doubleValue();
+        } else if (paramType == Long.TYPE) {
+          constructorParams[i] = num.longValue();
+        } else if (paramType == Integer.TYPE) {
+          constructorParams[i] = num.intValue();
+        } else if (paramType == Short.TYPE) {
+          constructorParams[i] = num.shortValue();
+        } else if (paramType == Byte.TYPE) {
+          constructorParams[i] = num.byteValue();
+        } else {
+          constructorParams[i] = args.get(i);
+        }
+        continue;
+      }
+
+      // enum conversion
+      if (paramType.isEnum() && objectType.equals(String.class)) {
+        LOG.debug("Yes, will convert a String to enum");
+        constructorParams[i] = Enum.valueOf(paramType, (String) args.get(i));
+        continue;
+      }
+
+      // List to array conversion
+      if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
+        LOG.debug("Conversion appears possible...");
+        List list = (List) obj;
+        LOG.debug("Array Type: {}, List type: {}" + paramType.getComponentType()
+            + list.get(0).getClass());
+
+        // create an array of the right type
+        Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size());
+        for (int j = 0; j < list.size(); j++) {
+          Array.set(newArrayObj, j, list.get(j));
+
+        }
+        constructorParams[i] = newArrayObj;
+        LOG.debug("After conversion: {}" + constructorParams[i]);
+      }
+    }
+    return constructorParams;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java
new file mode 100644
index 0000000..19dbfc9
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java
@@ -0,0 +1,41 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.TopologyBuilder;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+
+
+public class SpoutBuilder {
+
+  protected void buildSpouts(EcoExecutionContext executionContext,
+                             TopologyBuilder builder,
+                             ObjectBuilder objectBuilder)
+      throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+      NoSuchFieldException, InvocationTargetException {
+    EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition();
+
+    for (ObjectDefinition def: topologyDefinition.getSpouts()) {
+      Object obj = objectBuilder.buildObject(def, executionContext);
+      builder.setSpout(def.getId(), (IRichSpout) obj, def.getParallelism());
+      executionContext.addSpout(def.getId(), obj);
+    }
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java
new file mode 100644
index 0000000..e31e3f0
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java
@@ -0,0 +1,129 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IWindowedBolt;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import com.twitter.heron.eco.definition.ComponentStream;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.GroupingDefinition;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+import com.twitter.heron.eco.definition.StreamDefinition;
+
+public class StreamBuilder {
+
+  protected void buildStreams(EcoExecutionContext executionContext, TopologyBuilder builder,
+                              ObjectBuilder objectBuilder)
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException,
+      NoSuchFieldException, InvocationTargetException {
+    EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition();
+    Map<String, ComponentStream> componentStreams = new HashMap<>();
+
+    HashMap<String, BoltDeclarer> declarers = new HashMap<>();
+    for (StreamDefinition stream : topologyDefinition.getStreams()) {
+      Object boltObj = executionContext.getBolt(stream.getTo());
+      BoltDeclarer declarer = declarers.get(stream.getTo());
+      if (boltObj instanceof IRichBolt) {
+        if (declarer == null) {
+          declarer = builder.setBolt(stream.getTo(),
+              (IRichBolt) boltObj,
+              topologyDefinition.parallelismForBolt(stream.getTo()));
+          declarers.put(stream.getTo(), declarer);
+        }
+      } else if (boltObj instanceof IBasicBolt) {
+        if (declarer == null) {
+          declarer = builder.setBolt(
+              stream.getTo(),
+              (IBasicBolt) boltObj,
+              topologyDefinition.parallelismForBolt(stream.getTo()));
+          declarers.put(stream.getTo(), declarer);
+        }
+      } else if (boltObj instanceof IWindowedBolt) {
+        if (declarer == null) {
+          declarer = builder.setBolt(
+              stream.getTo(),
+              (IWindowedBolt) boltObj,
+              topologyDefinition.parallelismForBolt(stream.getTo()));
+          declarers.put(stream.getTo(), declarer);
+        }
+      }  else {
+        throw new IllegalArgumentException("Class does not appear to be a bolt: "
+            + boltObj.getClass().getName());
+      }
+
+      GroupingDefinition grouping = stream.getGrouping();
+      // if the streamId is defined, use it for the grouping,
+      // otherwise assume default stream
+      // Todo(joshfischer) Not sure if "default" is still valid
+      String streamId = grouping.getStreamId() == null
+          ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId();
+
+
+      switch (grouping.getType()) {
+        case SHUFFLE:
+          declarer.shuffleGrouping(stream.getFrom(), streamId);
+          break;
+        case FIELDS:
+          //TODO check for null grouping args
+          List<String> groupingArgs = grouping.getArgs();
+          if (groupingArgs == null) {
+            throw new IllegalArgumentException("You must supply arguments for Fields grouping");
+          }
+          declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(groupingArgs));
+          break;
+        case ALL:
+          declarer.allGrouping(stream.getFrom(), streamId);
+          break;
+        case GLOBAL:
+          declarer.globalGrouping(stream.getFrom(), streamId);
+          break;
+        case NONE:
+          declarer.noneGrouping(stream.getFrom(), streamId);
+          break;
+        case CUSTOM:
+          declarer.customGrouping(stream.getFrom(), streamId,
+              buildCustomStreamGrouping(stream.getGrouping().getCustomClass(),
+                  executionContext,
+                  objectBuilder));
+          break;
+        default:
+          throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+      }
+    }
+    executionContext.setStreams(componentStreams);
+  }
+
+  private CustomStreamGrouping buildCustomStreamGrouping(ObjectDefinition objectDefinition,
+                                                         EcoExecutionContext executionContext,
+                                                         ObjectBuilder objectBuilder)
+      throws ClassNotFoundException,
+      IllegalAccessException, InstantiationException, NoSuchFieldException,
+      InvocationTargetException {
+    Object grouping = objectBuilder.buildObject(objectDefinition, executionContext);
+    return (CustomStreamGrouping) grouping;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java
new file mode 100644
index 0000000..806ae14
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java
@@ -0,0 +1,27 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class BeanDefinition extends ObjectDefinition {
+
+  private String id;
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java
new file mode 100644
index 0000000..ff70fde
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java
@@ -0,0 +1,30 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.List;
+
+public class BeanListReference {
+  public List<String> ids;
+
+  public BeanListReference() { }
+
+  public BeanListReference(List<String> ids) {
+    this.ids = ids;
+  }
+
+  public List<String> getIds() {
+    return ids;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java
new file mode 100644
index 0000000..2314c82
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java
@@ -0,0 +1,30 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class BeanReference {
+  private String id;
+
+  public BeanReference(String id) {
+    this.id = id;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java
new file mode 100644
index 0000000..203623d
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java
@@ -0,0 +1,17 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class BoltDefinition extends ObjectDefinition {
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java
new file mode 100644
index 0000000..5b747fd
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java
@@ -0,0 +1,67 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class ComponentStream {
+
+  private String id;
+
+  private String toComponent;
+
+  private String fromComponent;
+
+  private String streamName;
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public Object getToComponent() {
+    return toComponent;
+  }
+
+  public void setToComponent(String toComponent) {
+    this.toComponent = toComponent;
+  }
+
+  public String getFromComponent() {
+    return fromComponent;
+  }
+
+  public void setFromComponent(String fromComponent) {
+    this.fromComponent = fromComponent;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public void setStreamName(String streamName) {
+    this.streamName = streamName;
+  }
+
+  @Override
+  public String toString() {
+    return "ComponentStream{"
+        + "toComponent='" + toComponent + '\''
+        + ", fromComponent='" + fromComponent
+        + '\''
+        + ", streamName='" + streamName + '\''
+        + '}';
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java
new file mode 100644
index 0000000..d63918b
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java
@@ -0,0 +1,64 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigurationMethodDefinition {
+  private String name;
+  private List<Object> args;
+  private boolean hasReferences = false;
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public List<Object> getArgs() {
+    return args;
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void setArgs(List<Object> args) {
+
+    List<Object> newVal = new ArrayList<Object>();
+    for (Object obj : args) {
+      if (obj instanceof LinkedHashMap) {
+        Map map = (Map) obj;
+        if (map.containsKey("ref") && map.size() == 1) {
+          newVal.add(new BeanReference((String) map.get("ref")));
+          this.hasReferences = true;
+        } else if (map.containsKey("reflist") && map.size() == 1) {
+          newVal.add(new BeanListReference((List<String>) map.get("reflist")));
+          this.hasReferences = true;
+        } else {
+          newVal.add(obj);
+        }
+      } else {
+        newVal.add(obj);
+      }
+    }
+    this.args = newVal;
+  }
+
+  public boolean hasReferences() {
+    return this.hasReferences;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java
new file mode 100644
index 0000000..6a1c8da
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java
@@ -0,0 +1,112 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+
+
+public class EcoExecutionContext {
+
+  private EcoTopologyDefinition topologyDefinition;
+
+  private Config config;
+
+  private Map<String, Object> spouts = new HashMap<>();
+
+  private Map<String, Object> bolts = new HashMap<>();
+
+  private Map<String, ComponentStream> streams = new HashMap<>();
+
+  private Map<String, Object> components = new HashMap<>();
+
+  public EcoExecutionContext(EcoTopologyDefinition topologyDefinition, Config config) {
+    this.topologyDefinition = topologyDefinition;
+    this.config = config;
+  }
+
+  public EcoTopologyDefinition getTopologyDefinition() {
+    return topologyDefinition;
+  }
+
+  public void setTopologyDefinition(EcoTopologyDefinition topologyDefinition) {
+    this.topologyDefinition = topologyDefinition;
+  }
+
+  public Config getConfig() {
+    return config;
+  }
+
+  public void setConfig(Config config) {
+    this.config = config;
+  }
+
+  public Map<String, Object> getSpouts() {
+    return spouts;
+  }
+
+  public void setSpouts(Map<String, Object> spouts) {
+    this.spouts = spouts;
+  }
+
+  public Map<String, Object> getBolts() {
+    return bolts;
+  }
+
+  public Object getBolt(String id) {
+    return this.bolts.get(id);
+  }
+
+  public void setBolts(Map<String, Object> bolts) {
+    this.bolts = bolts;
+  }
+
+  public void addBolt(String key, Object value) {
+    this.bolts.put(key, value);
+  }
+
+  public Object getChild(String id) {
+    return this.bolts.get(id);
+  }
+
+  public Map<String, ComponentStream> getStreams() {
+    return streams;
+  }
+
+  public void setStreams(Map<String, ComponentStream> streams) {
+    this.streams = streams;
+  }
+
+  public Map<String, Object> getComponents() {
+    return components;
+  }
+
+  public void addComponent(String key, Object value) {
+    this.components.put(key, value);
+  }
+
+  public Object getComponent(String id) {
+    return this.components.get(id);
+  }
+
+  public void setComponents(Map<String, Object> components) {
+    this.components = components;
+  }
+
+  public void addSpout(String key, Object value) {
+    this.spouts.put(key, value);
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java
new file mode 100644
index 0000000..b5b8ad9
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java
@@ -0,0 +1,108 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EcoTopologyDefinition {
+
+  private String name;
+  private Map<String, Object> config = new HashMap<>();
+  private Map<String, SpoutDefinition> spouts =  new LinkedHashMap<>();
+  private Map<String, BoltDefinition> bolts = new LinkedHashMap<>();
+  private List<StreamDefinition> streams = new ArrayList<>();
+  private Map<String, BeanDefinition> components = new LinkedHashMap<>();
+
+  public List<SpoutDefinition> getSpouts() {
+    return new ArrayList<>(this.spouts.values());
+  }
+
+  public SpoutDefinition getSpout(String id) {
+    return this.spouts.get(id);
+  }
+
+  public void setSpouts(List<SpoutDefinition> sources) {
+    this.spouts = new LinkedHashMap<>();
+    for (SpoutDefinition source: sources) {
+      this.spouts.put(source.getId(), source);
+    }
+  }
+
+  public List<BoltDefinition> getBolts() {
+    return new ArrayList<>(this.bolts.values());
+  }
+
+  public BoltDefinition getBolt(String id) {
+    return this.bolts.get(id);
+  }
+
+  public void setBolts(List<BoltDefinition> children) {
+    this.bolts = new LinkedHashMap<>();
+    for (BoltDefinition child: children) {
+      this.bolts.put(child.getId(), child);
+    }
+  }
+
+  public List<BeanDefinition> getComponents() {
+    return new ArrayList<>(this.components.values());
+  }
+
+  public Object getComponent(String id) {
+    return this.components.get(id);
+  }
+
+  public void setComponents(List<BeanDefinition> components) {
+    for (BeanDefinition bean: components) {
+      this.components.put(bean.getId(), bean);
+    }
+  }
+
+  public void addComponent(String key, BeanDefinition value) {
+    this.components.put(key, value);
+  }
+
+  public List<StreamDefinition> getStreams() {
+    return streams;
+  }
+
+  public void setStreams(List<StreamDefinition> streams) {
+    this.streams = streams;
+  }
+
+
+  public Map<String, Object> getConfig() {
+    return config;
+  }
+
+  public void setConfig(Map<String, Object> config) {
+    this.config = config;
+  }
+
+  public String getName() {
+
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public Number parallelismForBolt(String to) {
+    return this.bolts.get(to).getParallelism();
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java
new file mode 100644
index 0000000..1eeb817
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java
@@ -0,0 +1,65 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+import java.util.List;
+
+public class GroupingDefinition {
+
+  public enum Type {
+    ALL,
+    CUSTOM,
+    SHUFFLE,
+    FIELDS,
+    GLOBAL,
+    NONE
+  }
+
+  private Type type;
+  private String streamId;
+  private List<String> args;
+  private ObjectDefinition customClass;
+
+  public Type getType() {
+    return type;
+  }
+
+  public void setType(Type type) {
+    this.type = type;
+  }
+
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+
+  public List<String> getArgs() {
+    return args;
+  }
+
+  public void setArgs(List<String> args) {
+    this.args = args;
+  }
+
+  public ObjectDefinition getCustomClass() {
+    return customClass;
+  }
+
+  public void setCustomClass(ObjectDefinition customClass) {
+    this.customClass = customClass;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java
new file mode 100644
index 0000000..3a675b7
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java
@@ -0,0 +1,106 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class ObjectDefinition {
+
+  private String id;
+  private String className;
+  private int parallelism = 1;
+  private List<Object> constructorArgs;
+  private List<PropertyDefinition> properties;
+  private List<ConfigurationMethodDefinition> configMethods;
+  private boolean hasReferences;
+
+  public List<PropertyDefinition> getProperties() {
+    return properties;
+  }
+
+  public boolean hasConstructorArgs() {
+    return this.constructorArgs != null && this.constructorArgs.size() > 0;
+  }
+
+  public void setProperties(List<PropertyDefinition> properties) {
+    this.properties = properties;
+  }
+
+  public boolean hasReferences() {
+    return this.hasReferences;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public void setClassName(String className) {
+    this.className = className;
+  }
+
+  public int getParallelism() {
+    return parallelism;
+  }
+
+  public void setParallelism(int parallelism) {
+    this.parallelism = parallelism;
+  }
+
+  public List<Object> getConstructorArgs() {
+    return constructorArgs;
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void setConstructorArgs(List<Object> constructorArgs) {
+
+    List<Object> newVal = new ArrayList<Object>();
+    for (Object obj : constructorArgs) {
+      if (obj instanceof LinkedHashMap) {
+        Map map = (Map) obj;
+        if (map.containsKey("ref") && map.size() == 1) {
+          newVal.add(new BeanReference((String) map.get("ref")));
+          this.hasReferences = true;
+        } else if (map.containsKey("reflist") && map.size() == 1) {
+          newVal.add(new BeanListReference((List<String>) map.get("reflist")));
+          this.hasReferences = true;
+        } else {
+          newVal.add(obj);
+        }
+      } else {
+        newVal.add(obj);
+      }
+    }
+    this.constructorArgs = newVal;
+  }
+
+  public List<ConfigurationMethodDefinition> getConfigMethods() {
+    return configMethods;
+  }
+
+  public void setConfigMethods(List<ConfigurationMethodDefinition> configMethods) {
+    this.configMethods = configMethods;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java
new file mode 100644
index 0000000..e1cc902
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java
@@ -0,0 +1,55 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class PropertyDefinition {
+
+  private String name;
+  private Object value;
+  private String ref;
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  public void setValue(Object value) {
+    if (this.ref != null) {
+      throw new IllegalStateException("A property can only have a value OR a reference, not both.");
+    }
+    this.value = value;
+  }
+
+  public String getRef() {
+    return ref;
+  }
+
+  public void setRef(String ref) {
+    if (this.value != null) {
+      throw new IllegalStateException("A property can only have a value OR a reference, not both.");
+    }
+    this.ref = ref;
+  }
+
+  public boolean isReference() {
+    return this.ref != null;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java
new file mode 100644
index 0000000..cb4f0a1
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java
@@ -0,0 +1,17 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class SpoutDefinition extends ObjectDefinition {
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java
new file mode 100644
index 0000000..03081c3
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java
@@ -0,0 +1,63 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.definition;
+
+public class StreamDefinition {
+
+  private String id;
+  private String name;
+  private String to;
+  private String from;
+  private GroupingDefinition grouping;
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getTo() {
+    return to;
+  }
+
+  public void setTo(String to) {
+    this.to = to;
+  }
+
+  public String getFrom() {
+    return from;
+  }
+
+  public void setFrom(String from) {
+    this.from = from;
+  }
+
+  public GroupingDefinition getGrouping() {
+    return grouping;
+  }
+
+  public void setGrouping(GroupingDefinition groupingDefinition) {
+    this.grouping = groupingDefinition;
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java b/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java
new file mode 100644
index 0000000..5aacfb3
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java
@@ -0,0 +1,53 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.parser;
+
+import java.io.InputStream;
+
+import org.yaml.snakeyaml.TypeDescription;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+import com.twitter.heron.eco.definition.BoltDefinition;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.SpoutDefinition;
+
+public class EcoParser {
+
+  public EcoTopologyDefinition parseFromInputStream(InputStream inputStream)
+      throws Exception {
+
+    Yaml yaml = topologyYaml();
+
+    if (inputStream == null) {
+      throw new Exception("Unable to load eco input stream");
+    }
+    return loadTopologyFromYaml(yaml, inputStream);
+  }
+
+  private EcoTopologyDefinition loadTopologyFromYaml(Yaml yaml, InputStream inputStream) {
+    return (EcoTopologyDefinition) yaml.load(inputStream);
+  }
+  private static Yaml topologyYaml() {
+    Constructor topologyConstructor = new Constructor(EcoTopologyDefinition.class);
+
+    TypeDescription topologyDescription = new TypeDescription(EcoTopologyDefinition.class);
+
+    topologyDescription.putListPropertyType("spouts", SpoutDefinition.class);
+    topologyDescription.putListPropertyType("bolts", BoltDefinition.class);
+    topologyConstructor.addTypeDescription(topologyDescription);
+
+    return new Yaml(topologyConstructor);
+  }
+}
diff --git a/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java b/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java
new file mode 100644
index 0000000..032a73e
--- /dev/null
+++ b/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java
@@ -0,0 +1,28 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.submit;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.StormTopology;
+
+public class EcoSubmitter {
+
+  public void submitTopology(String topologyName, Config topologyConfig, StormTopology topology)
+      throws AlreadyAliveException, InvalidTopologyException {
+    StormSubmitter.submitTopology(topologyName, topologyConfig, topology);
+  }
+}
diff --git a/heron/eco/tests/java/BUILD b/heron/eco/tests/java/BUILD
new file mode 100644
index 0000000..ef66b76
--- /dev/null
+++ b/heron/eco/tests/java/BUILD
@@ -0,0 +1,90 @@
+test_deps_files = [
+    "//third_party/java:powermock",
+    "//third_party/java:mockito",
+    "//third_party/java:junit4",
+]
+
+heron_local_deps = [
+    "//heron/eco/src/java:eco-java",
+    "//heron/api/src/java:api-java-low-level",
+    "//storm-compatibility/src/java:storm-compatibility-java",
+]
+
+eco_test_deps = heron_local_deps + test_deps_files
+
+java_test(
+    name = "EcoBuilderTest",
+    srcs = glob(["com/twitter/heron/eco/builder/EcoBuilderTest.java"]),
+    deps = eco_test_deps,
+    size = "small",
+)
+
+java_test(
+    name = "EcoParserTest",
+    srcs = glob(["com/twitter/heron/eco/parser/EcoParserTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "ConfigBuilderTest",
+    srcs = glob(["com/twitter/heron/eco/builder/ConfigBuilderTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "BoltBuilderTest",
+    srcs = glob(["com/twitter/heron/eco/builder/BoltBuilderTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "ComponentBuilderTest",
+    srcs = glob(["com/twitter/heron/eco/builder/ComponentBuilderTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "SpoutBuilderTest",
+    srcs = glob(["com/twitter/heron/eco/builder/SpoutBuilderTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "StreamBuilderTest",
+    srcs = glob(["com/twitter/heron/eco/builder/StreamBuilderTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "EcoTest",
+    srcs = glob(["com/twitter/heron/eco/EcoTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "ObjectBuilderTest",
+    srcs = glob(["com/twitter/heron/eco/builder/ObjectBuilderTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "BuilderUtilityTest",
+    srcs = glob(["com/twitter/heron/eco/builder/BuilderUtilityTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
+
+java_test(
+    name = "EcoSubmitterTest",
+    srcs = glob(["com/twitter/heron/eco/submit/EcoSubmitterTest.java"]),
+    deps = eco_test_deps,
+    size = "small"
+)
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java b/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java
new file mode 100644
index 0000000..ccf97f4
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java
@@ -0,0 +1,91 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco;
+
+import java.io.FileInputStream;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+
+import com.twitter.heron.eco.builder.EcoBuilder;
+import com.twitter.heron.eco.builder.ObjectBuilder;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.parser.EcoParser;
+import com.twitter.heron.eco.submit.EcoSubmitter;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EcoTest {
+
+  @Mock
+  private EcoBuilder mockEcoBuilder;
+  @Mock
+  private EcoParser mockEcoParser;
+  @Mock
+  private TopologyBuilder mockTopologyBuilder;
+  @Mock
+  private EcoSubmitter mockEcoSubmitter;
+  @InjectMocks
+  private Eco subject;
+
+  @After
+  public void ensureNoUnexpectedMockInteractions() {
+    Mockito.verifyNoMoreInteractions(mockEcoBuilder,
+        mockEcoParser,
+        mockTopologyBuilder,
+        mockEcoSubmitter);
+  }
+
+  @Test
+  public void testSubmit_AllGood_BehavesAsExpected() throws Exception {
+    FileInputStream mockStream = PowerMockito.mock(FileInputStream.class);
+
+    final String topologyName = "the name";
+    EcoTopologyDefinition topologyDefinition = new EcoTopologyDefinition();
+    topologyDefinition.setName(topologyName);
+    Config config = new Config();
+
+    when(mockEcoParser.parseFromInputStream(eq(mockStream))).thenReturn(topologyDefinition);
+    when(mockEcoBuilder.buildConfig(eq(topologyDefinition))).thenReturn(config);
+    when(mockEcoBuilder.buildTopologyBuilder(any(EcoExecutionContext.class),
+        any(ObjectBuilder.class))).thenReturn(mockTopologyBuilder);
+
+    subject.submit(mockStream);
+
+    verify(mockEcoParser).parseFromInputStream(same(mockStream));
+    verify(mockEcoBuilder).buildConfig(same(topologyDefinition));
+    verify(mockEcoBuilder).buildTopologyBuilder(any(EcoExecutionContext.class),
+        any(ObjectBuilder.class));
+    verify(mockTopologyBuilder).createTopology();
+    verify(mockEcoSubmitter).submitTopology(any(String.class), any(Config.class),
+        any(StormTopology.class));
+  }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java
new file mode 100644
index 0000000..5ae0c9d
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java
@@ -0,0 +1,89 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.BoltDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BoltBuilderTest {
+
+  @Mock
+  private EcoExecutionContext mockContext;
+  @Mock
+  private ObjectBuilder mockObjectBuilder;
+
+  private BoltBuilder subject;
+
+  @Before
+  public void setUpForEachTestCase() {
+    subject = new BoltBuilder();
+  }
+
+  @After
+  public void ensureNoUnexpectedMockInteractions() {
+    Mockito.verifyNoMoreInteractions(mockContext,
+        mockObjectBuilder);
+  }
+
+  @Test
+  public void testBuildBolts_AllGood_BehavesAsExpected() throws ClassNotFoundException,
+      InvocationTargetException, NoSuchFieldException, InstantiationException,
+      IllegalAccessException {
+    EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition();
+    BoltDefinition boltDefinition = new BoltDefinition();
+    final String id = "id";
+    boltDefinition.setId(id);
+    BoltDefinition boltDefinition1 = new BoltDefinition();
+    final String id1 = "id1";
+    boltDefinition1.setId(id1);
+    List<BoltDefinition> boltDefinitions = new ArrayList<>();
+    boltDefinitions.add(boltDefinition);
+    boltDefinitions.add(boltDefinition1);
+    ecoTopologyDefinition.setBolts(boltDefinitions);
+    Object object = new Object();
+    Object object1 = new Object();
+
+    when(mockContext.getTopologyDefinition()).thenReturn(ecoTopologyDefinition);
+    when(mockObjectBuilder.buildObject(eq(boltDefinition), eq(mockContext))).thenReturn(object);
+    when(mockObjectBuilder.buildObject(eq(boltDefinition1), eq(mockContext))).thenReturn(object1);
+
+    subject.buildBolts(mockContext, mockObjectBuilder);
+
+    verify(mockContext).getTopologyDefinition();
+    verify(mockObjectBuilder).buildObject(same(boltDefinition), same(mockContext));
+    verify(mockObjectBuilder).buildObject(same(boltDefinition1), same(mockContext));
+    verify(mockContext).addBolt(eq(id), anyObject());
+    verify(mockContext).addBolt(eq(id1), anyObject());
+  }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java
new file mode 100644
index 0000000..a92bc63
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java
@@ -0,0 +1,139 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.testing.TestWordSpout;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.BeanReference;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+import com.twitter.heron.eco.definition.PropertyDefinition;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+import static org.junit.Assert.assertThat;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BuilderUtilityTest {
+
+
+  @Mock
+  private ObjectDefinition mockObjectDefinition;
+  @Mock
+  private Object mockObject;
+  @Mock
+  private EcoExecutionContext mockContext;
+
+  private BuilderUtility subject;
+
+  @Before
+  public void setUpForEachTestCase() {
+    subject = new BuilderUtility();
+  }
+
+  @After
+  public void ensureNoUnexpectedMockInteractions() {
+    Mockito.verifyNoMoreInteractions(mockObject,
+        mockObjectDefinition,
+        mockContext);
+  }
+
+  @Test
+  public void toSetterName_ReturnsCorrectName() {
+    final String name = "name";
+    final String expectedName = "setName";
+    String setterName = subject.toSetterName(name);
+
+    assertThat(setterName, is(equalTo(expectedName)));
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void classForName_ReturnsCorrectClass() throws ClassNotFoundException {
+    final String className = TestWordSpout.class.getName();
+
+    Class clazz = subject.classForName(className);
+
+    assertThat(clazz, notNullValue());
+    assertThat(className, is(equalTo(clazz.getName())));
+  }
+
+  @Test
+  public void applyProperties_SetterFound_BehavesAsExpected()
+      throws IllegalAccessException, NoSuchFieldException,
+      InvocationTargetException {
+    final String id = "id";
+    final String ref = "ref";
+    final String fakeComponent = "component";
+    BeanReference beanReference = new BeanReference(id);
+    List<PropertyDefinition> propertyDefinitions = new ArrayList<>();
+    PropertyDefinition propertyDefinition = new PropertyDefinition();
+    propertyDefinition.setRef(ref);
+    propertyDefinition.setName(id);
+    propertyDefinitions.add(propertyDefinition);
+
+    when(mockObjectDefinition.getProperties()).thenReturn(propertyDefinitions);
+    when(mockContext.getComponent(eq(ref))).thenReturn(fakeComponent);
+
+    subject.applyProperties(mockObjectDefinition, beanReference, mockContext);
+
+    verify(mockContext).getComponent(same(ref));
+    verify(mockObjectDefinition).getProperties();
+  }
+
+  @Test
+  public void applyProperties_NoSetterFound_BehavesAsExpected()
+      throws IllegalAccessException, NoSuchFieldException,
+      InvocationTargetException {
+    final String ref = "ref";
+    final String fakeComponent = "component";
+    MockComponent mockComponent = new MockComponent();
+    List<PropertyDefinition> propertyDefinitions = new ArrayList<>();
+    PropertyDefinition propertyDefinition = new PropertyDefinition();
+    propertyDefinition.setRef(ref);
+    propertyDefinition.setName("publicStr");
+    propertyDefinitions.add(propertyDefinition);
+
+    when(mockObjectDefinition.getProperties()).thenReturn(propertyDefinitions);
+    when(mockContext.getComponent(eq(ref))).thenReturn(fakeComponent);
+
+    subject.applyProperties(mockObjectDefinition, mockComponent, mockContext);
+
+    verify(mockContext).getComponent(same(ref));
+    verify(mockObjectDefinition).getProperties();
+  }
+
+  public class MockComponent {
+    public String publicStr;
+  }
+
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java
new file mode 100644
index 0000000..d3d5102
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java
@@ -0,0 +1,89 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.BeanDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ComponentBuilderTest {
+
+  @Mock
+  private EcoExecutionContext mockContext;
+  @Mock
+  private ObjectBuilder mockObjectBuilder;
+
+  private ComponentBuilder subject;
+
+  @Before
+  public void setUpForEachTestCase() {
+    subject = new ComponentBuilder();
+  }
+
+  @After
+  public void ensureNoUnexpectedMockInteractions() {
+    Mockito.verifyNoMoreInteractions(mockContext,
+        mockObjectBuilder);
+  }
+
+  @Test
+  public void testBuildComponents_AllGood_BehavesAsExpected() throws ClassNotFoundException,
+      InvocationTargetException, NoSuchFieldException,
+      InstantiationException, IllegalAccessException {
+    BeanDefinition beanDefinition = new BeanDefinition();
+    final String id = "bean";
+    beanDefinition.setId(id);
+    BeanDefinition beanDefinition1 = new BeanDefinition();
+    final String id1 = "bean1";
+    beanDefinition1.setId(id1);
+    List<BeanDefinition> componentDefinitions = new ArrayList<>();
+    componentDefinitions.add(beanDefinition);
+    componentDefinitions.add(beanDefinition1);
+    EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition();
+    ecoTopologyDefinition.setComponents(componentDefinitions);
+    Object object = new Object();
+    Object object1 = new Object();
+
+    when(mockContext.getTopologyDefinition()).thenReturn(ecoTopologyDefinition);
+    when(mockObjectBuilder.buildObject(eq(beanDefinition), eq(mockContext))).thenReturn(object);
+    when(mockObjectBuilder.buildObject(eq(beanDefinition1), eq(mockContext))).thenReturn(object1);
+
+    subject.buildComponents(mockContext, mockObjectBuilder);
+
+    verify(mockContext).getTopologyDefinition();
+    verify(mockObjectBuilder).buildObject(same(beanDefinition), same(mockContext));
+    verify(mockObjectBuilder).buildObject(same(beanDefinition1), same(mockContext));
+    verify(mockContext).addComponent(eq(id), anyObject());
+    verify(mockContext).addComponent(eq(id1), anyObject());
+  }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java
new file mode 100644
index 0000000..e65845c
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java
@@ -0,0 +1,63 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link ConfigBuilder}
+ */
+public class ConfigBuilderTest {
+
+  private ConfigBuilder subject;
+
+  @Before
+  public void setUpForEachTestCase() {
+    subject = new ConfigBuilder();
+  }
+
+  @Test
+  public void testBuildConfig_ConfigIsNotDefined_ReturnsEmptyConfig() {
+    EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition();
+
+    Config config = subject.buildConfig(ecoTopologyDefinition);
+
+    assertThat(0, is(equalTo(config.size())));
+  }
+
+  @Test
+  public void testBuildConfig_ConfigIsDefined_ReturnsCorrectValues() {
+    EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition();
+    Map<String, Object> topologyDefinitionConfig = new HashMap<>();
+    topologyDefinitionConfig.put(Config.STORM_ZOOKEEPER_SERVERS, 2);
+    topologyDefinitionConfig.put(Config.TOPOLOGY_WORKERS, 4);
+    ecoTopologyDefinition.setConfig(topologyDefinitionConfig);
+
+    Config config = subject.buildConfig(ecoTopologyDefinition);
+
+    assertThat(config.get(Config.STORM_ZOOKEEPER_SERVERS), is(equalTo(2)));
+    assertThat(config.get(Config.TOPOLOGY_WORKERS), is(equalTo(4)));
+  }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java
new file mode 100644
index 0000000..1bf87b2
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java
@@ -0,0 +1,127 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EcoBuilderTest {
+
+  @Mock
+  private SpoutBuilder mockSpoutBuilder;
+  @Mock
+  private BoltBuilder mockBoltBuilder;
+  @Mock
+  private StreamBuilder mockStreamBuilder;
+  @Mock
+  private ComponentBuilder mockComponentBuilder;
+  @Mock
+  private ConfigBuilder mockConfigBuilder;
+  @InjectMocks
+  private EcoBuilder subject;
+
+  private Map<String, Object> configMap;
+
+  private EcoTopologyDefinition ecoTopologyDefinition;
+
+  @Before
+  public void setUpForEachTestCase() {
+    configMap = new HashMap<>();
+    ecoTopologyDefinition = new EcoTopologyDefinition();
+    ecoTopologyDefinition.setConfig(configMap);
+  }
+
+  @After
+  public void ensureNoUnexpectedMockInteractions() {
+    verifyNoMoreInteractions(mockSpoutBuilder,
+        mockBoltBuilder,
+        mockStreamBuilder,
+        mockComponentBuilder,
+        mockConfigBuilder);
+  }
+
+  @Test
+  public void testBuild_EmptyConfigMap_ReturnsDefaultConfigs() {
+
+    Config config = new Config();
+    when(mockConfigBuilder.buildConfig(eq(ecoTopologyDefinition))).thenReturn(config);
+
+    Config returnedConfig = subject.buildConfig(ecoTopologyDefinition);
+
+    verify(mockConfigBuilder).buildConfig(same(ecoTopologyDefinition));
+
+    assertThat(returnedConfig.get(Config.TOPOLOGY_DEBUG), is(nullValue()));
+    assertThat(config, sameInstance(returnedConfig));
+  }
+
+  @Test
+  public void testBuild_CustomConfigMap_ReturnsCorrectConfigs() {
+    configMap.put(Config.TOPOLOGY_DEBUG, false);
+    final String environment = "dev";
+    final int spouts = 3;
+    configMap.put(Config.TOPOLOGY_ENVIRONMENT, environment);
+    configMap.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, spouts);
+
+    Config config = new Config();
+
+    when(mockConfigBuilder.buildConfig(eq(ecoTopologyDefinition))).thenReturn(config);
+
+    assertThat(subject.buildConfig(ecoTopologyDefinition), sameInstance(config));
+
+    verify(mockConfigBuilder).buildConfig(same(ecoTopologyDefinition));
+  }
+
+  @Test
+  public void testBuildTopologyBuilder_BuildsAsExpected()
+      throws IllegalAccessException, ClassNotFoundException,
+      InstantiationException, NoSuchFieldException, InvocationTargetException {
+    Config config = new Config();
+    EcoExecutionContext context = new EcoExecutionContext(ecoTopologyDefinition, config);
+    ObjectBuilder objectBuilder = new ObjectBuilder();
+    subject.buildTopologyBuilder(context, objectBuilder);
+
+    verify(mockSpoutBuilder).buildSpouts(same(context),
+        any(TopologyBuilder.class), same(objectBuilder));
+    verify(mockBoltBuilder).buildBolts(same(context), same(objectBuilder));
+    verify(mockStreamBuilder).buildStreams(same(context), any(TopologyBuilder.class),
+        same(objectBuilder));
+    verify(mockComponentBuilder).buildComponents(same(context), same(objectBuilder));
+  }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java
new file mode 100644
index 0000000..d1ee878
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java
@@ -0,0 +1,159 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.TestWordSpout;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.ConfigurationMethodDefinition;
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+@RunWith(MockitoJUnitRunner.class)
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ObjectBuilderTest {
+
+  @Mock
+  private ObjectDefinition mockObjectDefinition;
+  @Mock
+  private EcoExecutionContext mockContext;
+  @Mock
+  private BuilderUtility mockBuilderUtility;
+  @Mock
+  private ConfigurationMethodDefinition mockMethodDefinition;
+  @InjectMocks
+  private ObjectBuilder subject;
+
+  @After
+  public void ensureNoUnexpectedMockInteractions() {
+    Mockito.verifyNoMoreInteractions(mockContext,
+        mockObjectDefinition,
+        mockBuilderUtility,
+        mockMethodDefinition);
+  }
+
+  @Test
+  public void buildObject_WithArgsBeanReferenceAndOther_BehavesAsExpected()
+      throws ClassNotFoundException,
+      InvocationTargetException, NoSuchFieldException,
+      InstantiationException, IllegalAccessException {
+    final String beanReference1 = "bean1";
+    List<Object> constructorArgs = new ArrayList<>();
+    List<Object> objects = new ArrayList<>();
+    List<Object> firstObject = new ArrayList<>();
+    objects.add(firstObject);
+    constructorArgs.add(objects);
+    Object someComponent = new Object();
+    final String className = FixedTuple.class.getName();
+    final Class testClass = FixedTuple.class;
+    final String methodName = "toString";
+    List<ConfigurationMethodDefinition> methodDefinitions = new ArrayList<>();
+
+    methodDefinitions.add(mockMethodDefinition);
+
+    when(mockObjectDefinition.getClassName()).thenReturn(className);
+    when(mockObjectDefinition.hasConstructorArgs()).thenReturn(true);
+    when(mockObjectDefinition.getConstructorArgs()).thenReturn(constructorArgs);
+    when(mockObjectDefinition.hasReferences()).thenReturn(true);
+    when(mockContext.getComponent(eq(beanReference1))).thenReturn(someComponent);
+    when(mockBuilderUtility.resolveReferences(eq(constructorArgs), eq(mockContext)))
+        .thenCallRealMethod();
+    when(mockBuilderUtility.classForName(eq(className))).thenReturn(testClass);
+    when(mockObjectDefinition.getConfigMethods()).thenReturn(methodDefinitions);
+    when(mockMethodDefinition.hasReferences()).thenReturn(true);
+    when(mockMethodDefinition.getArgs()).thenReturn(null);
+    when(mockMethodDefinition.getName()).thenReturn(methodName);
+
+    Object object = subject.buildObject(mockObjectDefinition, mockContext);
+
+    verify(mockObjectDefinition).getClassName();
+    verify(mockObjectDefinition).hasConstructorArgs();
+    verify(mockObjectDefinition).getConstructorArgs();
+    verify(mockObjectDefinition).hasReferences();
+    verify(mockBuilderUtility).classForName(same(className));
+    verify(mockBuilderUtility).resolveReferences(same(constructorArgs), same(mockContext));
+    verify(mockBuilderUtility).applyProperties(eq(mockObjectDefinition), any(Object.class),
+        same(mockContext));
+    verify(mockObjectDefinition).getConfigMethods();
+    verify(mockMethodDefinition).hasReferences();
+    verify(mockMethodDefinition).getArgs();
+    verify(mockBuilderUtility, times(2)).resolveReferences(anyListOf(Object.class),
+        same(mockContext));
+    verify(mockMethodDefinition).getName();
+
+    assertThat(object, is(instanceOf(FixedTuple.class)));
+    FixedTuple fixedTuple = (FixedTuple) object;
+    assertThat(fixedTuple.values, is(equalTo(objects)));
+    assertThat(fixedTuple.values.get(0), is(equalTo(firstObject)));
+  }
+
+  @Test
+  public void buildObject_NoArgs_BehavesAsExpected()
+      throws ClassNotFoundException, InvocationTargetException,
+      NoSuchFieldException, InstantiationException, IllegalAccessException {
+
+    final Class fixedTupleClass = TestWordSpout.class;
+    final String className = TestWordSpout.class.getName();
+    List<ConfigurationMethodDefinition> methodDefinitions = new ArrayList<>();
+    final String methodName = "close";
+
+    methodDefinitions.add(mockMethodDefinition);
+
+    when(mockObjectDefinition.getClassName()).thenReturn(className);
+    when(mockObjectDefinition.hasConstructorArgs()).thenReturn(false);
+    when(mockBuilderUtility.classForName(eq(className))).thenReturn(fixedTupleClass);
+    when(mockObjectDefinition.getConfigMethods()).thenReturn(methodDefinitions);
+    when(mockMethodDefinition.hasReferences()).thenReturn(false);
+    when(mockMethodDefinition.getName()).thenReturn(methodName);
+
+    subject.buildObject(mockObjectDefinition, mockContext);
+
+    verify(mockObjectDefinition).getClassName();
+    verify(mockObjectDefinition).hasConstructorArgs();
+    verify(mockBuilderUtility).classForName(same(className));
+    verify(mockBuilderUtility).applyProperties(same(mockObjectDefinition),
+        anyObject(), same(mockContext));
+    verify(mockObjectDefinition).getConfigMethods();
+    verify(mockMethodDefinition).hasReferences();
+    verify(mockMethodDefinition).getName();
+    verify(mockMethodDefinition).getArgs();
+  }
+}
+
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java
new file mode 100644
index 0000000..12abe93
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java
@@ -0,0 +1,159 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.SpoutDefinition;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SpoutBuilderTest {
+
+  @Mock
+  private EcoExecutionContext mockContext;
+  @Mock
+  private TopologyBuilder mockTopologyBuilder;
+  @Mock
+  private ObjectBuilder mockObjectBuilder;
+
+  private SpoutBuilder subject;
+
+  @Before
+  public void setUpForEachTestCase() {
+    subject = new SpoutBuilder();
+  }
+
+  @After
+  public void ensureNoUnexpectedMockInteractions() {
+    Mockito.verifyNoMoreInteractions(mockContext,
+        mockTopologyBuilder,
+        mockObjectBuilder);
+  }
+
+  @Test
+  public void testBuildSpouts_AllGood_BehavesAsExpected() throws ClassNotFoundException,
+      InvocationTargetException, NoSuchFieldException,
+      InstantiationException, IllegalAccessException {
+    EcoTopologyDefinition topologyDefinition = new EcoTopologyDefinition();
+
+    SpoutDefinition spoutDefinition = new SpoutDefinition();
+    final String id = "id";
+    final int parallelism = 2;
+    spoutDefinition.setId(id);
+    spoutDefinition.setParallelism(parallelism);
+    SpoutDefinition spoutDefinition1 = new SpoutDefinition();
+    final String id1 = "id1";
+    final int parallelism1 = 3;
+    spoutDefinition1.setId(id1);
+    spoutDefinition1.setParallelism(parallelism1);
+    List<SpoutDefinition> spoutDefinitions = new ArrayList<>();
+    spoutDefinitions.add(spoutDefinition);
+    spoutDefinitions.add(spoutDefinition1);
+    topologyDefinition.setSpouts(spoutDefinitions);
+    MockSpout mockSpout = new MockSpout();
+    MockSpout mockSpout1 = new MockSpout();
+
+    when(mockObjectBuilder.buildObject(eq(spoutDefinition),
+        eq(mockContext))).thenReturn(mockSpout);
+    when(mockObjectBuilder.buildObject(eq(spoutDefinition1),
+        eq(mockContext))).thenReturn(mockSpout1);
+    when(mockContext.getTopologyDefinition()).thenReturn(topologyDefinition);
+
+    subject.buildSpouts(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+    verify(mockContext).getTopologyDefinition();
+    verify(mockObjectBuilder).buildObject(same(spoutDefinition), same(mockContext));
+    verify(mockObjectBuilder).buildObject(same(spoutDefinition1), same(mockContext));
+    verify(mockTopologyBuilder).setSpout(eq(id), eq(mockSpout), eq(parallelism));
+    verify(mockTopologyBuilder).setSpout(eq(id1), eq(mockSpout1), eq(parallelism1));
+    verify(mockContext).addSpout(eq(id), anyObject());
+    verify(mockContext).addSpout(eq(id1), anyObject());
+  }
+
+  @SuppressWarnings("serial")
+  private class MockSpout implements IRichSpout {
+
+    @Override
+    public void open(Map<String, Object> conf,
+                     TopologyContext context, SpoutOutputCollector collector) {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void activate() {
+
+    }
+
+    @Override
+    public void deactivate() {
+
+    }
+
+    @Override
+    public void nextTuple() {
+
+    }
+
+    @Override
+    public void ack(Object msgId) {
+
+    }
+
+    @Override
+    public void fail(Object msgId) {
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+      return null;
+    }
+  }
+
+
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java
new file mode 100644
index 0000000..9c9b868
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java
@@ -0,0 +1,370 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.builder;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IWindowedBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TimestampExtractor;
+import org.apache.storm.windowing.TupleWindow;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.twitter.heron.eco.definition.EcoExecutionContext;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.GroupingDefinition;
+import com.twitter.heron.eco.definition.ObjectDefinition;
+import com.twitter.heron.eco.definition.StreamDefinition;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StreamBuilderTest {
+
+  @Mock
+  private EcoTopologyDefinition mockDefinition;
+  @Mock
+  private EcoExecutionContext mockContext;
+  @Mock
+  private TopologyBuilder mockTopologyBuilder;
+  @Mock
+  private ObjectBuilder mockObjectBuilder;
+  @Mock
+  private BoltDeclarer mockBoltDeclarer;
+
+  private StreamBuilder subject;
+
+  @Before
+  public void setUpForEachTestCase() {
+    subject = new StreamBuilder();
+  }
+
+  @After
+  public void ensureNoUnexpectedMockInteractions() {
+    Mockito.verifyNoMoreInteractions(mockDefinition,
+        mockContext,
+        mockTopologyBuilder,
+        mockObjectBuilder,
+        mockBoltDeclarer);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void buildStreams_SpoutToIRichBolt_ShuffleGrouping() throws ClassNotFoundException,
+      InvocationTargetException, NoSuchFieldException,
+      InstantiationException, IllegalAccessException {
+    final int iRichBoltParallelism = 1;
+    final String to = "to";
+    final String from = "from";
+    final String streamId  = "id";
+    StreamDefinition streamDefinition  = new StreamDefinition();
+    streamDefinition.setFrom(from);
+    streamDefinition.setTo(to);
+    streamDefinition.setId(streamId);
+    List<StreamDefinition> streams = new ArrayList<>();
+    streams.add(streamDefinition);
+    GroupingDefinition groupingDefinition = new GroupingDefinition();
+    groupingDefinition.setType(GroupingDefinition.Type.SHUFFLE);
+    groupingDefinition.setStreamId(streamId);
+    streamDefinition.setGrouping(groupingDefinition);
+    MockIRichBolt mockIRichBolt = new MockIRichBolt();
+
+    when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+    when(mockContext.getBolt(eq(to))).thenReturn(mockIRichBolt);
+    when(mockDefinition.getStreams()).thenReturn(streams);
+    when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+    when(mockTopologyBuilder.setBolt(eq(to),
+        eq(mockIRichBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+
+    subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+    verify(mockContext).getTopologyDefinition();
+    verify(mockContext).getBolt(eq(to));
+    verify(mockDefinition).parallelismForBolt(eq(to));
+    verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIRichBolt), eq(iRichBoltParallelism));
+    verify(mockBoltDeclarer).shuffleGrouping(eq(from), eq(streamId));
+    verify(mockContext).setStreams(anyMap());
+    verify(mockDefinition).getStreams();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void buildStreams_SpoutToIBasicBolt_FieldsGroupingWithArgs() throws
+      ClassNotFoundException,
+      InvocationTargetException, NoSuchFieldException,
+      InstantiationException, IllegalAccessException {
+    final int iRichBoltParallelism = 1;
+    final String to = "to";
+    final String from = "from";
+    final String streamId  = "id";
+    StreamDefinition streamDefinition  = new StreamDefinition();
+    streamDefinition.setFrom(from);
+    streamDefinition.setTo(to);
+    streamDefinition.setId(streamId);
+    List<StreamDefinition> streams = new ArrayList<>();
+    streams.add(streamDefinition);
+    GroupingDefinition groupingDefinition = new GroupingDefinition();
+    groupingDefinition.setType(GroupingDefinition.Type.FIELDS);
+    List<String> args = new ArrayList<>();
+    args.add("arg1");
+    groupingDefinition.setArgs(args);
+    groupingDefinition.setStreamId(streamId);
+    streamDefinition.setGrouping(groupingDefinition);
+    MockIBasicBolt mockIBasicBolt = new MockIBasicBolt();
+
+    when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+    when(mockContext.getBolt(eq(to))).thenReturn(mockIBasicBolt);
+    when(mockDefinition.getStreams()).thenReturn(streams);
+    when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+    when(mockTopologyBuilder.setBolt(eq(to),
+        eq(mockIBasicBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+
+    subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+    verify(mockContext).getTopologyDefinition();
+    verify(mockContext).getBolt(eq(to));
+    verify(mockDefinition).parallelismForBolt(eq(to));
+    verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIBasicBolt), eq(iRichBoltParallelism));
+    verify(mockBoltDeclarer).fieldsGrouping(eq(from), eq(streamId), any(Fields.class));
+    verify(mockContext).setStreams(anyMap());
+    verify(mockDefinition).getStreams();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  @SuppressWarnings("unchecked")
+  public void buildStreams_SpoutToIBasicBolt_FieldsGroupingWithoutArgs_ExceptionThrown() throws
+      ClassNotFoundException,
+      InvocationTargetException, NoSuchFieldException,
+      InstantiationException, IllegalAccessException {
+    final int iRichBoltParallelism = 1;
+    final String to = "to";
+    final String from = "from";
+    final String streamId  = "id";
+    StreamDefinition streamDefinition  = new StreamDefinition();
+    streamDefinition.setFrom(from);
+    streamDefinition.setTo(to);
+    streamDefinition.setId(streamId);
+    List<StreamDefinition> streams = new ArrayList<>();
+    streams.add(streamDefinition);
+    GroupingDefinition groupingDefinition = new GroupingDefinition();
+    groupingDefinition.setType(GroupingDefinition.Type.FIELDS);
+
+    groupingDefinition.setStreamId(streamId);
+    streamDefinition.setGrouping(groupingDefinition);
+    MockIBasicBolt mockIBasicBolt = new MockIBasicBolt();
+    try {
+      when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+      when(mockContext.getBolt(eq(to))).thenReturn(mockIBasicBolt);
+      when(mockDefinition.getStreams()).thenReturn(streams);
+      when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+      when(mockTopologyBuilder.setBolt(eq(to),
+          eq(mockIBasicBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+
+      subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+    } finally {
+
+      verify(mockContext).getTopologyDefinition();
+      verify(mockContext).getBolt(eq(to));
+      verify(mockDefinition).parallelismForBolt(eq(to));
+      verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIBasicBolt), eq(iRichBoltParallelism));
+      verify(mockDefinition).getStreams();
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void buildStreams_SpoutToIWindowedBolt_CustomGrouping() throws ClassNotFoundException,
+      InvocationTargetException, NoSuchFieldException,
+      InstantiationException, IllegalAccessException {
+    final int iRichBoltParallelism = 1;
+    final String to = "to";
+    final String from = "from";
+    final String streamId  = "id";
+    StreamDefinition streamDefinition  = new StreamDefinition();
+    streamDefinition.setFrom(from);
+    streamDefinition.setTo(to);
+    streamDefinition.setId(streamId);
+    List<StreamDefinition> streams = new ArrayList<>();
+    streams.add(streamDefinition);
+    GroupingDefinition groupingDefinition = new GroupingDefinition();
+    groupingDefinition.setType(GroupingDefinition.Type.CUSTOM);
+    MockCustomObjectDefinition mockCustomObjectDefinition = new MockCustomObjectDefinition();
+
+    groupingDefinition.setCustomClass(mockCustomObjectDefinition);
+    List<String> args = new ArrayList<>();
+    args.add("arg1");
+    groupingDefinition.setArgs(args);
+    groupingDefinition.setStreamId(streamId);
+    streamDefinition.setGrouping(groupingDefinition);
+    MockIWindowedBolt mockIWindowedBolt = new MockIWindowedBolt();
+    MockCustomStreamGrouping mockCustomStreamGrouping = new MockCustomStreamGrouping();
+
+    when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition);
+    when(mockContext.getBolt(eq(to))).thenReturn(mockIWindowedBolt);
+    when(mockDefinition.getStreams()).thenReturn(streams);
+    when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism);
+    when(mockTopologyBuilder.setBolt(eq(to),
+        eq(mockIWindowedBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer);
+    when(mockObjectBuilder.buildObject(eq(mockCustomObjectDefinition),
+        eq(mockContext))).thenReturn(mockCustomStreamGrouping);
+
+    subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder);
+
+    verify(mockContext).getTopologyDefinition();
+    verify(mockContext).getBolt(eq(to));
+    verify(mockDefinition).parallelismForBolt(eq(to));
+    verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIWindowedBolt), eq(iRichBoltParallelism));
+    verify(mockBoltDeclarer).customGrouping(eq(from), eq(streamId), eq(mockCustomStreamGrouping));
+    verify(mockContext).setStreams(anyMap());
+    verify(mockDefinition).getStreams();
+    verify(mockObjectBuilder).buildObject(same(mockCustomObjectDefinition), same(mockContext));
+  }
+
+  private class MockCustomObjectDefinition extends ObjectDefinition {
+
+  }
+
+  @SuppressWarnings("serial")
+  private class MockCustomStreamGrouping implements CustomStreamGrouping {
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
+                        List<Integer> targetTasks) {
+
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+      return null;
+    }
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked", "serial"})
+  private class MockIRichBolt implements IRichBolt {
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+
+    }
+
+    @Override
+    public void execute(Tuple input) {
+
+    }
+
+    @Override
+    public void cleanup() {
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+      return null;
+    }
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked", "serial"})
+  private class MockIWindowedBolt implements IWindowedBolt {
+    @Override
+    public void prepare(Map<String, Object> topoConf,
+                        TopologyContext context, OutputCollector collector) {
+
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+
+    }
+
+    @Override
+    public void cleanup() {
+
+    }
+
+    @Override
+    public TimestampExtractor getTimestampExtractor() {
+      return null;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+      return null;
+    }
+  }
+
+
+  @SuppressWarnings({"rawtypes", "unchecked", "serial"})
+  public class MockIBasicBolt implements IBasicBolt {
+    @Override
+    public void prepare(Map stormConf, TopologyContext context) {
+
+    }
+
+    @Override
+    public void execute(Tuple input, BasicOutputCollector collector) {
+
+    }
+
+    @Override
+    public void cleanup() {
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+      return null;
+    }
+  }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java b/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java
new file mode 100644
index 0000000..24d226f
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java
@@ -0,0 +1,407 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.parser;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.heron.eco.definition.BeanDefinition;
+import com.twitter.heron.eco.definition.BeanReference;
+import com.twitter.heron.eco.definition.BoltDefinition;
+import com.twitter.heron.eco.definition.EcoTopologyDefinition;
+import com.twitter.heron.eco.definition.GroupingDefinition;
+import com.twitter.heron.eco.definition.PropertyDefinition;
+import com.twitter.heron.eco.definition.StreamDefinition;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+/**
+ * Unit tests for {@link EcoParser}
+ */
+public class EcoParserTest {
+
+
+  private static final String BOLT_1 = "bolt-1";
+  private static final String BOLT_2 = "bolt-2";
+  private static final String YAML_NO_CONFIG_STR = "# Licensed to the Apache Software Foundation"
+      + " (ASF) under one\n"
+      + "# or more contributor license agreements.  See the NOTICE file\n"
+      + "# distributed with this work for additional information\n"
+      + "# regarding copyright ownership.  The ASF licenses this file\n"
+      + "# to you under the Apache License, Version 2.0 (the\n"
+      + "# \"License\"); you may not use this file except in compliance\n"
+      + "# with the License.  You may obtain a copy of the License at\n"
+      + "#\n"
+      + "# http://www.apache.org/licenses/LICENSE-2.0\n"
+      + "#\n"
+      + "# Unless required by applicable law or agreed to in writing, software\n"
+      + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+      + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+      + "# See the License for the specific language governing permissions and\n"
+      + "# limitations under the License.\n"
+      + "\n"
+      + "---\n"
+      + "\n"
+      + "# topology definition\n"
+      + "# name to be used when submitting\n"
+      + "name: \"yaml-topology\"\n"
+      + "\n"
+      + "# topology configuration\n"
+      + "# this will be passed to the submitter as a map of config options\n"
+      + "#\n"
+      + "# spout definitions\n"
+      + "spouts:\n"
+      + "  - id: \"spout-1\"\n"
+      + "    className: \"com.twitter.heron.sample.TestWordSpout\"\n"
+      + "    parallelism: 1\n"
+      + "\n"
+      + "# bolt definitions\n"
+      + "bolts:\n"
+      + "  - id: \"bolt-1\"\n"
+      + "    className: \"com.twitter.heron.sample.TestWordCounter\"\n"
+      + "    parallelism: 2\n"
+      + "\n"
+      + "  - id: \"bolt-2\"\n"
+      + "    className: \"com.twitter.heron.sample.LogInfoBolt\"\n"
+      + "    parallelism: 1\n"
+      + "\n"
+      + "#stream definitions\n"
+      + "# stream definitions define connections between spouts and bolts.\n"
+      + "# note that such connections can be cyclical\n"
+      + "streams:\n"
+      + "  - name: \"spout-1 --> bolt-1\" # name isn't used (placeholder for logging, UI, etc.)\n"
+      + "    id: \"connection-1\"\n"
+      + "    from: \"spout-1\"\n"
+      + "    to: \"bolt-1\"\n"
+      + "    grouping:\n"
+      + "      type: FIELDS\n"
+      + "      args: [\"word\"]\n"
+      + "\n"
+      + "  - name: \"bolt-1 --> bolt2\"\n"
+      + "    id: \"connection-2\"\n"
+      + "    from: \"bolt-1\"\n"
+      + "    to: \"bolt-2\"\n"
+      + "    grouping:\n"
+      + "      type: SHUFFLE";
+  private static final String YAML_STR = "# Licensed to the Apache Software Foundation"
+      + " (ASF) under one\n"
+      + "# or more contributor license agreements.  See the NOTICE file\n"
+      + "# distributed with this work for additional information\n"
+      + "# regarding copyright ownership.  The ASF licenses this file\n"
+      + "# to you under the Apache License, Version 2.0 (the\n"
+      + "# \"License\"); you may not use this file except in compliance\n"
+      + "# with the License.  You may obtain a copy of the License at\n"
+      + "#\n"
+      + "# http://www.apache.org/licenses/LICENSE-2.0\n"
+      + "#\n"
+      + "# Unless required by applicable law or agreed to in writing, software\n"
+      + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+      + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+      + "# See the License for the specific language governing permissions and\n"
+      + "# limitations under the License.\n"
+      + "\n"
+      + "---\n"
+      + "\n"
+      + "# topology definition\n"
+      + "# name to be used when submitting\n"
+      + "name: \"yaml-topology\"\n"
+      + "\n"
+      + "# topology configuration\n"
+      + "# this will be passed to the submitter as a map of config options\n"
+      + "#\n"
+      + "config:\n"
+      + "  topology.workers: 1\n"
+      + "\n"
+      + "# spout definitions\n"
+      + "spouts:\n"
+      + "  - id: \"spout-1\"\n"
+      + "    className: \"com.twitter.heron.sample.TestWordSpout\"\n"
+      + "    parallelism: 1\n"
+      + "\n"
+      + "# bolt definitions\n"
+      + "bolts:\n"
+      + "  - id: \"bolt-1\"\n"
+      + "    className: \"com.twitter.heron.sample.TestWordCounter\"\n"
+      + "    parallelism: 2\n"
+      + "\n"
+      + "  - id: \"bolt-2\"\n"
+      + "    className: \"com.twitter.heron.sample.LogInfoBolt\"\n"
+      + "    parallelism: 1\n"
+      + "\n"
+      + "#stream definitions\n"
+      + "# stream definitions define connections between spouts and bolts.\n"
+      + "# note that such connections can be cyclical\n"
+      + "streams:\n"
+      + "  - name: \"spout-1 --> bolt-1\" # name isn't used (placeholder for logging, UI, etc.)\n"
+      + "    id: \"connection-1\"\n"
+      + "    from: \"spout-1\"\n"
+      + "    to: \"bolt-1\"\n"
+      + "    grouping:\n"
+      + "      type: FIELDS\n"
+      + "      args: [\"word\"]\n"
+      + "\n"
+      + "  - name: \"bolt-1 --> bolt2\"\n"
+      + "    id: \"connection-2\"\n"
+      + "    from: \"bolt-1\"\n"
+      + "    to: \"bolt-2\"\n"
+      + "    grouping:\n"
+      + "      type: SHUFFLE";
+
+  private static final String YAML_STR_1 = "# Test ability to wire together shell spouts/bolts\n"
+      + "---\n"
+      + "\n"
+      + "name: \"kafka-topology\"\n"
+      + "\n"
+      + "# Components\n"
+      + "# Components are analagous to Spring beans. They are meant to be used as constructor,\n"
+      + "# property(setter), and builder arguments.\n"
+      + "#\n"
+      + "# for the time being, components must be declared in the order they are referenced\n"
+      + "components:\n"
+      + "  - id: \"stringScheme\"\n"
+      + "    className: \"org.apache.storm.kafka.StringScheme\"\n"
+      + "\n"
+      + "  - id: \"stringMultiScheme\"\n"
+      + "    className: \"org.apache.storm.spout.SchemeAsMultiScheme\"\n"
+      + "    constructorArgs:\n"
+      + "      - ref: \"stringScheme\"\n"
+      + "\n"
+      + "  - id: \"zkHosts\"\n"
+      + "    className: \"org.apache.storm.kafka.ZkHosts\"\n"
+      + "    constructorArgs:\n"
+      + "      - \"localhost:2181\"\n"
+      + "\n"
+      + "# Alternative kafka config\n"
+      + "#  - id: \"kafkaConfig\"\n"
+      + "#    className: \"org.apache.storm.kafka.KafkaConfig\"\n"
+      + "#    constructorArgs:\n"
+      + "#      # brokerHosts\n"
+      + "#      - ref: \"zkHosts\"\n"
+      + "#      # topic\n"
+      + "#      - \"myKafkaTopic\"\n"
+      + "#      # clientId (optional)\n"
+      + "#      - \"myKafkaClientId\"\n"
+      + "\n"
+      + "  - id: \"spoutConfig\"\n"
+      + "    className: \"org.apache.storm.kafka.SpoutConfig\"\n"
+      + "    constructorArgs:\n"
+      + "      # brokerHosts\n"
+      + "      - ref: \"zkHosts\"\n"
+      + "      # topic\n"
+      + "      - \"myKafkaTopic\"\n"
+      + "      # zkRoot\n"
+      + "      - \"/kafkaSpout\"\n"
+      + "      # id\n"
+      + "      - \"myId\"\n"
+      + "    properties:\n"
+      + "      - name: \"ignoreZkOffsets\"\n"
+      + "        value: true\n"
+      + "      - name: \"scheme\"\n"
+      + "        ref: \"stringMultiScheme\"\n"
+      + "\n"
+      + "\n"
+      + "\n"
+      + "# topology configuration\n"
+      + "# this will be passed to the submitter as a map of config options\n"
+      + "#\n"
+      + "config:\n"
+      + "  topology.workers: 1\n"
+      + "  # ...\n"
+      + "\n"
+      + "# spout definitions\n"
+      + "spouts:\n"
+      + "  - id: \"kafka-spout\"\n"
+      + "    className: \"org.apache.storm.kafka.KafkaSpout\"\n"
+      + "    constructorArgs:\n"
+      + "      - ref: \"spoutConfig\"\n"
+      + "\n"
+      + "# bolt definitions\n"
+      + "bolts:\n"
+      + "  - id: \"splitsentence\"\n"
+      + "    className: \"org.apache.storm.flux.wrappers.bolts.FluxShellBolt\"\n"
+      + "    constructorArgs:\n"
+      + "      # command line\n"
+      + "      - [\"python\", \"splitsentence.py\"]\n"
+      + "      # output fields\n"
+      + "      - [\"word\"]\n"
+      + "    parallelism: 1\n"
+      + "    # ...\n"
+      + "\n"
+      + "  - id: \"log\"\n"
+      + "    className: \"org.apache.storm.flux.wrappers.bolts.LogInfoBolt\"\n"
+      + "    parallelism: 1\n"
+      + "    # ...\n"
+      + "\n"
+      + "  - id: \"count\"\n"
+      + "    className: \"org.apache.storm.testing.TestWordCounter\"\n"
+      + "    parallelism: 1\n"
+      + "    # ...\n"
+      + "\n"
+      + "#stream definitions\n"
+      + "# stream definitions define connections between spouts and bolts.\n"
+      + "# note that such connections can be cyclical\n"
+      + "# custom stream groupings are also supported\n"
+      + "\n"
+      + "streams:\n"
+      + "  - name: \"kafka --> split\" # name isn't used (placeholder for logging, UI, etc.)\n"
+      + "    id: \"stream1\"\n"
+      + "    from: \"kafka-spout\"\n"
+      + "    to: \"splitsentence\"\n"
+      + "    grouping:\n"
+      + "      type: SHUFFLE\n"
+      + "\n"
+      + "  - name: \"split --> count\"\n"
+      + "    id: \"stream2\"\n"
+      + "    from: \"splitsentence\"\n"
+      + "    to: \"count\"\n"
+      + "    grouping:\n"
+      + "      type: FIELDS\n"
+      + "      args: [\"word\"]\n"
+      + "\n"
+      + "  - name: \"count --> log\"\n"
+      + "    id: \"stream3\"\n"
+      + "    from: \"count\"\n"
+      + "    to: \"log\"\n"
+      + "    grouping:\n"
+      + "      type: SHUFFLE";
+  private EcoParser subject;
+
+  @Before
+  public void setUpBeforeEachTestCase() {
+    subject = new EcoParser();
+  }
+
+
+  @Test
+  public void testParseFromInputStream_VerifyComponents_MapsAsExpected() throws Exception {
+
+    InputStream inputStream = new ByteArrayInputStream(YAML_STR_1.getBytes());
+
+    EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream);
+    List<BeanDefinition> components = topologyDefinition.getComponents();
+
+    assertEquals("kafka-topology", topologyDefinition.getName());
+    assertEquals(4, components.size());
+
+    BeanDefinition stringSchemeComponent = components.get(0);
+    assertEquals("stringScheme", stringSchemeComponent.getId());
+    assertEquals("org.apache.storm.kafka.StringScheme", stringSchemeComponent.getClassName());
+
+
+    BeanDefinition stringMultiSchemeComponent = components.get(1);
+    assertEquals("stringMultiScheme", stringMultiSchemeComponent.getId());
+    assertEquals("org.apache.storm.spout.SchemeAsMultiScheme",
+        stringMultiSchemeComponent.getClassName());
+    assertEquals(1, stringMultiSchemeComponent.getConstructorArgs().size());
+    BeanReference multiStringReference =
+        (BeanReference) stringMultiSchemeComponent.getConstructorArgs().get(0);
+    assertEquals("stringScheme", multiStringReference.getId());
+
+    BeanDefinition zkHostsComponent = components.get(2);
+    assertEquals("zkHosts", zkHostsComponent.getId());
+    assertEquals("org.apache.storm.kafka.ZkHosts", zkHostsComponent.getClassName());
+    assertEquals(1, zkHostsComponent.getConstructorArgs().size());
+    assertEquals("localhost:2181", zkHostsComponent.getConstructorArgs().get(0));
+
+    BeanDefinition spoutConfigComponent = components.get(3);
+    List<Object> spoutConstructArgs = spoutConfigComponent.getConstructorArgs();
+    assertEquals("spoutConfig", spoutConfigComponent.getId());
+    assertEquals("org.apache.storm.kafka.SpoutConfig", spoutConfigComponent.getClassName());
+    BeanReference spoutBrokerHostComponent = (BeanReference) spoutConstructArgs.get(0);
+    assertEquals("zkHosts", spoutBrokerHostComponent.getId());
+    assertEquals("myKafkaTopic", spoutConstructArgs.get(1));
+    assertEquals("/kafkaSpout", spoutConstructArgs.get(2));
+    List<PropertyDefinition> properties = spoutConfigComponent.getProperties();
+    assertEquals("ignoreZkOffsets", properties.get(0).getName());
+    assertEquals(true, properties.get(0).getValue());
+    assertEquals("scheme", properties.get(1).getName());
+    assertEquals(true, properties.get(1).isReference());
+    assertEquals("stringMultiScheme", properties.get(1).getRef());
+  }
+
+  @Test
+  public void testParseFromInputStream_VerifyAllButComponents_MapsAsExpected() throws Exception {
+
+    InputStream inputStream = new ByteArrayInputStream(YAML_STR.getBytes());
+
+    EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream);
+
+    assertEquals("yaml-topology", topologyDefinition.getName());
+    assertEquals(1, topologyDefinition.getConfig().size());
+    assertEquals(1, topologyDefinition.getConfig().get("topology.workers"));
+
+    BoltDefinition bolt1 = topologyDefinition.getBolt(BOLT_1);
+    assertNotNull(bolt1);
+    assertEquals(2, bolt1.getParallelism());
+    assertEquals("com.twitter.heron.sample.TestWordCounter", bolt1.getClassName());
+    assertEquals(BOLT_1, bolt1.getId());
+
+
+    BoltDefinition bolt2 = topologyDefinition.getBolt(BOLT_2);
+    assertEquals(1, bolt2.getParallelism());
+    assertEquals("com.twitter.heron.sample.LogInfoBolt", bolt2.getClassName());
+    assertEquals(BOLT_2, bolt2.getId());
+
+    List<StreamDefinition> streamDefinitions = topologyDefinition.getStreams();
+    StreamDefinition streamDefinitionOne = streamDefinitions.get(0);
+    GroupingDefinition groupingDefinitionOne = streamDefinitionOne.getGrouping();
+    StreamDefinition streamDefinitionTwo = streamDefinitions.get(1);
+    GroupingDefinition groupingDefinitionTwo = streamDefinitionTwo.getGrouping();
+
+    assertEquals(2, streamDefinitions.size());
+
+    assertEquals(BOLT_1, streamDefinitionOne.getTo());
+    assertEquals("spout-1", streamDefinitionOne.getFrom());
+    assertEquals(GroupingDefinition.Type.FIELDS, groupingDefinitionOne.getType());
+    assertEquals(1, groupingDefinitionOne.getArgs().size());
+    assertEquals("word", groupingDefinitionOne.getArgs().get(0));
+    assertEquals("connection-1", streamDefinitionOne.getId());
+
+    assertEquals(BOLT_2, streamDefinitionTwo.getTo());
+    assertEquals("bolt-1", streamDefinitionTwo.getFrom());
+    assertEquals(GroupingDefinition.Type.SHUFFLE, groupingDefinitionTwo.getType());
+    assertEquals("connection-2", streamDefinitionTwo.getId());
+    assertNull(groupingDefinitionTwo.getArgs());
+
+  }
+
+  @Test
+  public void testPartFromInputStream_NoConfigSpecified_ConfigMapIsEmpty() throws Exception {
+    InputStream inputStream = new ByteArrayInputStream(YAML_NO_CONFIG_STR.getBytes());
+
+    EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream);
+
+    assertNotNull(topologyDefinition.getConfig());
+    assertEquals(0, topologyDefinition.getConfig().size());
+  }
+
+  @Test(expected = Exception.class)
+  public void testParseFromInputStream_StreamIsNull_ExceptionThrown() throws Exception {
+    InputStream inputStream = null;
+    EcoTopologyDefinition ecoTopologyDefinition = null;
+
+    try {
+      ecoTopologyDefinition = subject.parseFromInputStream(inputStream);
+    } finally {
+      assertNull(ecoTopologyDefinition);
+    }
+  }
+}
diff --git a/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java b/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java
new file mode 100644
index 0000000..3291d49
--- /dev/null
+++ b/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java
@@ -0,0 +1,55 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.eco.submit;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormSubmitter.class)
+public class EcoSubmitterTest {
+
+  private EcoSubmitter subject;
+
+  @Before
+  public void setUpForEachTestCase() {
+    subject = new EcoSubmitter();
+  }
+
+  @Test
+  public void submitTopology_AllGood_BehavesAsExpected()
+      throws Exception {
+    Config config = new Config();
+    StormTopology topology = new StormTopology();
+    PowerMockito.spy(StormSubmitter.class);
+    PowerMockito.doNothing().when(StormSubmitter.class, "submitTopology",
+        any(String.class), any(Config.class), any(StormTopology.class));
+
+    subject.submitTopology("name", config, topology);
+    PowerMockito.verifyStatic(times(1));
+    StormSubmitter.submitTopology(anyString(), any(Config.class), any(StormTopology.class));
+
+  }
+}
diff --git a/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java b/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java
index 4c54dc3..f767a0e 100644
--- a/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java
+++ b/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java
@@ -55,7 +55,7 @@
   private static final String EXCEPTION_FIRST_TIME = "firstTime";
   private static final String EXCEPTION_LOGGING = "logging";
   private static final String RECORD_SOURCE = "source";
-  private static final String RECORD_CONTEXT = "context";
+  private static final String RECORD_CONTEXT = "ecoExecutionContext";
 
   private volatile int processRecordInvoked = 0;
   private volatile int flushInvoked = 0;
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java b/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java
index dc696d1..36ad671 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java
@@ -25,7 +25,7 @@
 public class MetricsRecordTest {
   private static final int N = 100;
   private static final String SOURCE = "source";
-  private static final String CONTEXT = "context";
+  private static final String CONTEXT = "ecoExecutionContext";
   private final List<MetricsRecord> records = new ArrayList<MetricsRecord>();
 
   @Before
diff --git a/scripts/get_all_heron_paths.sh b/scripts/get_all_heron_paths.sh
index 5ea0e68..ed40c02 100755
--- a/scripts/get_all_heron_paths.sh
+++ b/scripts/get_all_heron_paths.sh
@@ -67,7 +67,7 @@
 }
 
 function get_heron_java_paths() {
-  local java_paths=$(find {heron,heron/tools,tools,integration_test,contrib} -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed "s|/java/src/.*$|/java/src|" |  sed "s|/tests/java/.*$|/tests/java|" | sort -u | fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
+  local java_paths=$(find {heron,heron/tools,tools,integration_test,storm-compatibility,contrib} -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed "s|/java/src/.*$|/java/src|" |  sed "s|/tests/java/.*$|/tests/java|" | sort -u | fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
   if [ "$(uname -s | tr 'A-Z' 'a-z')" != "darwin" ]; then
     java_paths=$(echo "${java_paths}" | fgrep -v "/objc_tools/")
   fi