Merge remote-tracking branch 'upstream/master' into testing-after-cleaning
diff --git a/.gitignore b/.gitignore
index c453af1..8b27df2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,7 +17,6 @@
 # Bazel generated files #
 #########################
 /bazel-*
-bazel
 scripts/compile/env_exec.sh
 config/heron-config.h
 tools/cpp/osx_gcc_wrapper.sh
@@ -31,7 +30,6 @@
 
 # Compiled source #
 ###################
-/build
 /release
 *.com
 *.class
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..0fd5399
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,4 @@
+[submodule "website/public"]
+	path = website/public
+	url = git@github.com:twitter/heron.git
+	branch = gh-pages
diff --git a/build/EMPTY b/build/EMPTY
deleted file mode 100644
index e69de29..0000000
--- a/build/EMPTY
+++ /dev/null
diff --git a/heron/common/src/java/com/twitter/heron/common/basics/FileUtils.java b/heron/common/src/java/com/twitter/heron/common/basics/FileUtils.java
index 46a79c3..d0f5088 100644
--- a/heron/common/src/java/com/twitter/heron/common/basics/FileUtils.java
+++ b/heron/common/src/java/com/twitter/heron/common/basics/FileUtils.java
@@ -66,9 +66,9 @@
     return true;
   }
 
-  public static boolean writeToFile(String filename, byte[] contents) {
+  public static boolean writeToFile(String filename, byte[] contents, boolean overwrite) {
     File f = new File(filename);
-    if (f.exists()) {
+    if (!overwrite && f.exists()) {
       LOG.severe("File exists. Topology exists: " + filename);
       return false;
     }
diff --git a/heron/common/tests/java/com/twitter/heron/common/basics/FileUtilsTest.java b/heron/common/tests/java/com/twitter/heron/common/basics/FileUtilsTest.java
index eebc3aa..8abc489 100644
--- a/heron/common/tests/java/com/twitter/heron/common/basics/FileUtilsTest.java
+++ b/heron/common/tests/java/com/twitter/heron/common/basics/FileUtilsTest.java
@@ -70,11 +70,14 @@
   @Test
   public void testWriteToFile() throws Exception {
     String currentWorkingDir = Paths.get("").toAbsolutePath().normalize().toString();
-    Assert.assertFalse(FileUtils.writeToFile(currentWorkingDir, null));
+    Assert.assertFalse(FileUtils.writeToFile(currentWorkingDir, null, false));
 
     PowerMockito.mockStatic(Files.class);
     String randomString = UUID.randomUUID().toString();
-    Assert.assertTrue(FileUtils.writeToFile(randomString, null));
+    Assert.assertTrue(FileUtils.writeToFile(randomString, null, false));
+
+    // We can overwrite it
+    Assert.assertTrue(FileUtils.writeToFile(randomString, null, true));
   }
 
   /**
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerMain.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerMain.java
index 097684a..249d87f 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerMain.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerMain.java
@@ -28,28 +28,20 @@
 
 import com.twitter.heron.common.basics.SysUtils;
 import com.twitter.heron.common.utils.logging.LoggingHelper;
-import com.twitter.heron.proto.scheduler.Scheduler;
 import com.twitter.heron.proto.system.ExecutionEnvironment;
-import com.twitter.heron.scheduler.client.HttpServiceSchedulerClient;
 import com.twitter.heron.scheduler.client.ISchedulerClient;
-import com.twitter.heron.scheduler.client.LibrarySchedulerClient;
+import com.twitter.heron.scheduler.client.SchedulerClientFactory;
 import com.twitter.heron.spi.common.ClusterConfig;
 import com.twitter.heron.spi.common.ClusterDefaults;
 import com.twitter.heron.spi.common.Command;
 import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.common.Context;
 import com.twitter.heron.spi.common.Keys;
-import com.twitter.heron.spi.scheduler.IScheduler;
 import com.twitter.heron.spi.statemgr.IStateManager;
 import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
 import com.twitter.heron.spi.utils.ReflectionUtils;
-import com.twitter.heron.spi.utils.Runtime;
 
-public final class RuntimeManagerMain {
-  private RuntimeManagerMain() {
-
-  }
-
+public class RuntimeManagerMain {
   private static final Logger LOG = Logger.getLogger(RuntimeManagerMain.class.getName());
 
   // Print usage options
@@ -233,6 +225,7 @@
         .put(Keys.cluster(), cluster)
         .put(Keys.role(), role)
         .put(Keys.environ(), environ)
+        .put(Keys.verbose(), verbose)
         .put(Keys.topologyContainerId(), containerId);
 
     Config.Builder topologyConfig = Config.newBuilder()
@@ -253,10 +246,52 @@
     LOG.fine("Static config loaded successfully ");
     LOG.fine(config.toString());
 
+    // Create a new instance of RuntimeManagerMain
+    RuntimeManagerMain runtimeManagerMain = new RuntimeManagerMain(config, command);
+    boolean isSuccessful = runtimeManagerMain.manageTopology();
+
+    // Log the result and exit
+    if (!isSuccessful) {
+      throw new RuntimeException(String.format("Failed to %s topology %s", command, topologyName));
+    } else {
+      LOG.log(Level.FINE, "Topology {0} {1} successfully", new Object[]{topologyName, command});
+    }
+  }
+
+  // holds all the config read
+  private final Config config;
+
+  // command to manage a topology
+  private final Command command;
+
+  public RuntimeManagerMain(
+      Config config,
+      Command command) {
+    // initialize the options
+    this.config = config;
+    this.command = command;
+  }
+
+  /**
+   * Manager a topology
+   * 1. Instantiate necessary resources
+   * 2. Valid whether the runtime management is legal
+   * 3. Complete the runtime management for a specific command
+   *
+   * @return true if run runtimeManager successfully
+   */
+  public boolean manageTopology() {
+    String topologyName = Context.topologyName(config);
     // 1. Do prepare work
     // create an instance of state manager
     String statemgrClass = Context.stateManagerClass(config);
-    IStateManager statemgr = ReflectionUtils.newInstance(statemgrClass);
+    IStateManager statemgr;
+    try {
+      statemgr = ReflectionUtils.newInstance(statemgrClass);
+    } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
+      LOG.log(Level.SEVERE, "Failed to instantiate instances", e);
+      return false;
+    }
 
     boolean isSuccessful = false;
 
@@ -268,14 +303,27 @@
       // TODO(mfu): timeout should read from config
       SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);
 
-      boolean isValid = validateRuntimeManage(config, adaptor, topologyName);
+      boolean isValid = validateRuntimeManage(adaptor, topologyName);
 
       // 2. Try to manage topology if valid
       if (isValid) {
         // invoke the appropriate command to manage the topology
         LOG.log(Level.FINE, "Topology: {0} to be {1}ed", new Object[]{topologyName, command});
 
-        isSuccessful = manageTopology(config, command, adaptor);
+        // build the runtime config
+        Config runtime = Config.newBuilder()
+            .put(Keys.topologyName(), Context.topologyName(config))
+            .put(Keys.schedulerStateManagerAdaptor(), adaptor)
+            .build();
+
+        // Create a ISchedulerClient basing on the config
+        ISchedulerClient schedulerClient = getSchedulerClient(runtime);
+        if (schedulerClient == null) {
+          LOG.severe("Failed to initialize scheduler client");
+          return false;
+        }
+
+        isSuccessful = callRuntimeManagerRunner(runtime, schedulerClient);
       }
     } finally {
       // 3. Do post work basing on the result
@@ -285,17 +333,10 @@
       SysUtils.closeIgnoringExceptions(statemgr);
     }
 
-    // Log the result and exit
-    if (!isSuccessful) {
-      throw new RuntimeException(String.format("Failed to %s topology %s", command, topologyName));
-    } else {
-      LOG.log(Level.FINE, "Topology {0} {1} successfully", new Object[]{topologyName, command});
-    }
+    return isSuccessful;
   }
 
-
-  public static boolean validateRuntimeManage(
-      Config config,
+  protected boolean validateRuntimeManage(
       SchedulerStateManagerAdaptor adaptor,
       String topologyName) {
     // Check whether the topology has already been running
@@ -319,64 +360,18 @@
     return true;
   }
 
-  public static boolean manageTopology(
-      Config config, Command command,
-      SchedulerStateManagerAdaptor adaptor)
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException, IOException {
-    // build the runtime config
-    Config runtime = Config.newBuilder()
-        .put(Keys.topologyName(), Context.topologyName(config))
-        .put(Keys.schedulerStateManagerAdaptor(), adaptor)
-        .build();
-
-    // Create a ISchedulerClient basing on the config
-    ISchedulerClient schedulerClient = getSchedulerClient(config, runtime);
-    if (schedulerClient == null) {
-      throw new IllegalArgumentException("Failed to initialize scheduler client");
-    }
-
+  protected boolean callRuntimeManagerRunner(Config runtime, ISchedulerClient schedulerClient) {
     // create an instance of the runner class
     RuntimeManagerRunner runtimeManagerRunner =
         new RuntimeManagerRunner(config, runtime, command, schedulerClient);
 
-
     // invoke the appropriate handlers based on command
     boolean ret = runtimeManagerRunner.call();
 
     return ret;
   }
 
-  // TODO(mfu): Make it into Factory pattern if needed
-  public static ISchedulerClient getSchedulerClient(Config config, Config runtime)
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    LOG.fine("Creating scheduler client");
-    ISchedulerClient schedulerClient;
-
-    if (Context.schedulerService(config)) {
-      // get the instance of the state manager
-      SchedulerStateManagerAdaptor statemgr = Runtime.schedulerStateManagerAdaptor(runtime);
-
-      Scheduler.SchedulerLocation schedulerLocation =
-          statemgr.getSchedulerLocation(Runtime.topologyName(runtime));
-
-      if (schedulerLocation == null) {
-        LOG.log(Level.SEVERE, "Failed to get scheduler location");
-        return null;
-      }
-
-      LOG.log(Level.FINE, "Scheduler is listening on location: {0} ", schedulerLocation.toString());
-
-      schedulerClient =
-          new HttpServiceSchedulerClient(config, runtime, schedulerLocation.getHttpEndpoint());
-    } else {
-      // create an instance of scheduler
-      final String schedulerClass = Context.schedulerClass(config);
-      final IScheduler scheduler = ReflectionUtils.newInstance(schedulerClass);
-      LOG.fine("Invoke scheduler as a library");
-
-      schedulerClient = new LibrarySchedulerClient(config, runtime, scheduler);
-    }
-
-    return schedulerClient;
+  protected ISchedulerClient getSchedulerClient(Config runtime) {
+    return new SchedulerClientFactory(config, runtime).getSchedulerClient();
   }
 }
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/SchedulerMain.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/SchedulerMain.java
index 1fa8b47..3a02e73 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/SchedulerMain.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/SchedulerMain.java
@@ -62,7 +62,7 @@
   public SchedulerMain(
       Config config,
       TopologyAPI.Topology topology,
-      int schedulerServerPort) throws IOException {
+      int schedulerServerPort) {
     // initialize the options
     this.config = config;
     this.topology = topology;
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/SubmitterMain.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/SubmitterMain.java
index 5a30871..bd8701a 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/SubmitterMain.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/SubmitterMain.java
@@ -46,13 +46,9 @@
 /**
  * Calls Uploader to upload topology package, and Launcher to launch Scheduler.
  */
-public final class SubmitterMain {
+public class SubmitterMain {
   private static final Logger LOG = Logger.getLogger(SubmitterMain.class.getName());
 
-  private SubmitterMain() {
-
-  }
-
   /**
    * Load the topology config
    *
@@ -315,25 +311,73 @@
     LOG.fine("Static config loaded successfully ");
     LOG.fine(config.toString());
 
+    SubmitterMain submitterMain = new SubmitterMain(config, topology);
+    boolean isSuccessful = submitterMain.submitTopology();
+
+    // Log the result and exit
+    if (!isSuccessful) {
+      throw new RuntimeException(String.format("Failed to submit topology %s", topology.getName()));
+    } else {
+      LOG.log(Level.FINE, "Topology {0} submitted successfully", topology.getName());
+    }
+  }
+
+  // holds all the config read
+  private final Config config;
+
+  // topology definition
+  private final TopologyAPI.Topology topology;
+
+  public SubmitterMain(
+      Config config,
+      TopologyAPI.Topology topology) {
+    // initialize the options
+    this.config = config;
+    this.topology = topology;
+  }
+
+  /**
+   * Submit a topology
+   * 1. Instantiate necessary resources
+   * 2. Valid whether it is legal to submit a topology
+   * 3. Call LauncherRunner
+   *
+   * @return true if the topology is submitted successfully
+   */
+  public boolean submitTopology() {
     // 1. Do prepare work
     // create an instance of state manager
     String statemgrClass = Context.stateManagerClass(config);
-    IStateManager statemgr = ReflectionUtils.newInstance(statemgrClass);
+    IStateManager statemgr;
 
     // Create an instance of the launcher class
     String launcherClass = Context.launcherClass(config);
-    ILauncher launcher = ReflectionUtils.newInstance(launcherClass);
+    ILauncher launcher;
 
     // Create an instance of the packing class
     String packingClass = Context.packingClass(config);
-    IPacking packing = ReflectionUtils.newInstance(packingClass);
+    IPacking packing;
 
     // create an instance of the uploader class
     String uploaderClass = Context.uploaderClass(config);
-    IUploader uploader = ReflectionUtils.newInstance(uploaderClass);
+    IUploader uploader;
 
-    // Local variable for convenient access
-    String topologyName = topology.getName();
+    try {
+      // create an instance of state manager
+      statemgr = ReflectionUtils.newInstance(statemgrClass);
+
+      // create an instance of launcher
+      launcher = ReflectionUtils.newInstance(launcherClass);
+
+      // create an instance of the packing class
+      packing = ReflectionUtils.newInstance(packingClass);
+
+      // create an instance of uploader
+      uploader = ReflectionUtils.newInstance(uploaderClass);
+    } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
+      LOG.log(Level.SEVERE, "Failed to instantiate instances", e);
+      return false;
+    }
 
     boolean isSuccessful = false;
     URI packageURI = null;
@@ -345,20 +389,32 @@
       // TODO(mfu): timeout should read from config
       SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);
 
-      boolean isValid = validateSubmit(adaptor, topologyName);
+      boolean isValid = validateSubmit(adaptor, topology.getName());
 
       // 2. Try to submit topology if valid
       if (isValid) {
         // invoke method to submit the topology
-        LOG.log(Level.FINE, "Topology {0} to be submitted", topologyName);
+        LOG.log(Level.FINE, "Topology {0} to be submitted", topology.getName());
 
         // Firstly, try to upload necessary packages
-        packageURI = uploadPackage(config, uploader);
+        packageURI = uploadPackage(uploader);
         if (packageURI == null) {
           LOG.severe("Failed to upload package.");
+          return false;
         } else {
           // Secondly, try to submit a topology
-          isSuccessful = submitTopology(config, topology, adaptor, launcher, packing, packageURI);
+          // build the runtime config
+          Config runtime = Config.newBuilder()
+              .put(Keys.topologyId(), topology.getId())
+              .put(Keys.topologyName(), topology.getName())
+              .put(Keys.topologyDefinition(), topology)
+              .put(Keys.schedulerStateManagerAdaptor(), adaptor)
+              .put(Keys.topologyPackageUri(), packageURI)
+              .put(Keys.launcherClassInstance(), launcher)
+              .put(Keys.packingClassInstance(), packing)
+              .build();
+
+          isSuccessful = callLauncherRunner(runtime);
         }
       }
     } finally {
@@ -377,15 +433,10 @@
       SysUtils.closeIgnoringExceptions(statemgr);
     }
 
-    // Log the result and exit
-    if (!isSuccessful) {
-      throw new RuntimeException(String.format("Failed to submit topology %s", topologyName));
-    } else {
-      LOG.log(Level.FINE, "Topology {0} submitted successfully", topologyName);
-    }
+    return isSuccessful;
   }
 
-  public static boolean validateSubmit(SchedulerStateManagerAdaptor adaptor, String topologyName) {
+  protected boolean validateSubmit(SchedulerStateManagerAdaptor adaptor, String topologyName) {
     // Check whether the topology has already been running
     Boolean isTopologyRunning = adaptor.isTopologyRunning(topologyName);
 
@@ -397,7 +448,7 @@
     return true;
   }
 
-  public static URI uploadPackage(Config config, IUploader uploader) {
+  protected URI uploadPackage(IUploader uploader) {
     // initialize the uploader
     uploader.initialize(config);
 
@@ -407,20 +458,7 @@
     return uploaderRet;
   }
 
-  public static boolean submitTopology(Config config, TopologyAPI.Topology topology,
-                                       SchedulerStateManagerAdaptor adaptor, ILauncher launcher,
-                                       IPacking packing, URI packageURI) {
-    // build the runtime config
-    Config runtime = Config.newBuilder()
-        .put(Keys.topologyId(), topology.getId())
-        .put(Keys.topologyName(), topology.getName())
-        .put(Keys.topologyDefinition(), topology)
-        .put(Keys.schedulerStateManagerAdaptor(), adaptor)
-        .put(Keys.topologyPackageUri(), packageURI)
-        .put(Keys.launcherClassInstance(), launcher)
-        .put(Keys.packingClassInstance(), packing)
-        .build();
-
+  protected boolean callLauncherRunner(Config runtime) {
     // using launch runner, launch the topology
     LaunchRunner launchRunner = new LaunchRunner(config, runtime);
     boolean result = launchRunner.call();
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/client/SchedulerClientFactory.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/client/SchedulerClientFactory.java
new file mode 100644
index 0000000..f0b82f5
--- /dev/null
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/client/SchedulerClientFactory.java
@@ -0,0 +1,83 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.scheduler.client;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.proto.scheduler.Scheduler;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+import com.twitter.heron.spi.scheduler.IScheduler;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import com.twitter.heron.spi.utils.ReflectionUtils;
+import com.twitter.heron.spi.utils.Runtime;
+
+public class SchedulerClientFactory {
+  private static final Logger LOG = Logger.getLogger(SchedulerClientFactory.class.getName());
+
+  private Config config;
+
+  private Config runtime;
+
+  public SchedulerClientFactory(Config config, Config runtime) {
+    this.config = config;
+    this.runtime = runtime;
+  }
+
+  /**
+   * Implementation of getSchedulerClient - Used to create objects
+   * Currently it creates either HttpServiceSchedulerClient or LibrarySchedulerClient
+   *
+   * @return getSchedulerClient created. return null if failed to create ISchedulerClient instance
+   */
+  public ISchedulerClient getSchedulerClient() {
+    LOG.fine("Creating scheduler client");
+    ISchedulerClient schedulerClient;
+
+    if (Context.schedulerService(config)) {
+      // get the instance of the state manager
+      SchedulerStateManagerAdaptor statemgr = Runtime.schedulerStateManagerAdaptor(runtime);
+
+      Scheduler.SchedulerLocation schedulerLocation =
+          statemgr.getSchedulerLocation(Runtime.topologyName(runtime));
+
+      if (schedulerLocation == null) {
+        LOG.log(Level.SEVERE, "Failed to get scheduler location");
+        return null;
+      }
+
+      LOG.log(Level.FINE, "Scheduler is listening on location: {0} ", schedulerLocation.toString());
+
+      schedulerClient =
+          new HttpServiceSchedulerClient(config, runtime, schedulerLocation.getHttpEndpoint());
+    } else {
+      // create an instance of scheduler
+      final String schedulerClass = Context.schedulerClass(config);
+      final IScheduler scheduler;
+      try {
+        scheduler = ReflectionUtils.newInstance(schedulerClass);
+      } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+        LOG.log(Level.SEVERE, "Failed to reflect new instance", e);
+        return null;
+      }
+      LOG.fine("Invoke scheduler as a library");
+
+      schedulerClient = new LibrarySchedulerClient(config, runtime, scheduler);
+    }
+
+    return schedulerClient;
+  }
+}
diff --git a/heron/scheduler-core/tests/java/BUILD b/heron/scheduler-core/tests/java/BUILD
index d77e645..5c0a0f0 100644
--- a/heron/scheduler-core/tests/java/BUILD
+++ b/heron/scheduler-core/tests/java/BUILD
@@ -46,6 +46,27 @@
 )
 
 java_test(
+  name="runtime-manager-main_unittest",
+  srcs=glob(
+    ["**/RuntimeManagerMainTest.java"] +
+    ["**/util/TopologyUtilityTest.java"]
+  ),
+  deps=scheduler_deps_files  +
+    ["//heron/statemgrs/src/java:null-statemgr-java"],
+  size="small",
+)
+
+java_test(
+  name="submitter-main_unittest",
+  srcs=glob(
+    ["**/SubmitterMainTest.java"] +
+    ["**/util/TopologyUtilityTest.java"]
+  ),
+  deps=scheduler_deps_files,
+  size="small",
+)
+
+java_test(
   name="scheduler-main_unittest",
   srcs=glob(
     ["**/SchedulerMainTest.java"] +
@@ -85,3 +106,13 @@
   deps=scheduler_deps_files,
   size="small",
 )
+
+java_test(
+  name="scheduler_client_factory_unittest",
+  srcs=glob(
+    ["**/SchedulerClientFactoryTest.java"]
+  ),
+  deps=scheduler_deps_files +
+    ["//heron/schedulers/src/java:null-scheduler-java"],
+  size="small",
+)
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/RuntimeManagerMainTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/RuntimeManagerMainTest.java
new file mode 100644
index 0000000..dd436bd
--- /dev/null
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/RuntimeManagerMainTest.java
@@ -0,0 +1,128 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.scheduler;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.twitter.heron.proto.system.ExecutionEnvironment;
+import com.twitter.heron.scheduler.client.ISchedulerClient;
+import com.twitter.heron.spi.common.Command;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.ConfigKeys;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import com.twitter.heron.statemgr.NullStateManager;
+
+public class RuntimeManagerMainTest {
+  private static final String TOPOLOGY_NAME = "topologyName";
+  private static final String TOPOLOGY_ID = "topologyId";
+  private static final Command MOCK_COMMAND = Command.KILL;
+
+  @Test
+  public void testValidateRuntimeManage() throws Exception {
+    final String CLUSTER = "cluster";
+    final String ROLE = "role";
+    final String ENVIRON = "env";
+    Config config = Mockito.mock(Config.class);
+    Mockito.when(config.getStringValue(ConfigKeys.get("CLUSTER"))).thenReturn(CLUSTER);
+    Mockito.when(config.getStringValue(ConfigKeys.get("ROLE"))).thenReturn(ROLE);
+    Mockito.when(config.getStringValue(ConfigKeys.get("ENVIRON"))).thenReturn(ENVIRON);
+
+    SchedulerStateManagerAdaptor adaptor = Mockito.mock(SchedulerStateManagerAdaptor.class);
+    RuntimeManagerMain runtimeManagerMain = new RuntimeManagerMain(config, MOCK_COMMAND);
+
+    // Topology is not running
+    Mockito.when(adaptor.isTopologyRunning(Mockito.eq(TOPOLOGY_NAME))).thenReturn(false);
+    Assert.assertFalse(runtimeManagerMain.validateRuntimeManage(adaptor, TOPOLOGY_NAME));
+    Mockito.when(adaptor.isTopologyRunning(Mockito.eq(TOPOLOGY_NAME))).thenReturn(null);
+    Assert.assertFalse(runtimeManagerMain.validateRuntimeManage(adaptor, TOPOLOGY_NAME));
+
+    // Topology is running
+    Mockito.when(adaptor.isTopologyRunning(Mockito.eq(TOPOLOGY_NAME))).thenReturn(true);
+    ExecutionEnvironment.ExecutionState.Builder stateBuilder =
+        ExecutionEnvironment.ExecutionState.newBuilder().
+            setTopologyName(TOPOLOGY_NAME).
+            setTopologyId(TOPOLOGY_ID).
+            setCluster(CLUSTER).
+            setEnviron(ENVIRON);
+
+    // cluster/role/environ not matched
+    final String WRONG_ROLE = "wrong";
+    ExecutionEnvironment.ExecutionState wrongState = stateBuilder.setRole(WRONG_ROLE).build();
+    Mockito.when(adaptor.getExecutionState(Mockito.eq(TOPOLOGY_NAME))).thenReturn(null);
+    Assert.assertFalse(runtimeManagerMain.validateRuntimeManage(adaptor, TOPOLOGY_NAME));
+    Mockito.when(adaptor.getExecutionState(Mockito.eq(TOPOLOGY_NAME))).thenReturn(wrongState);
+    Assert.assertFalse(runtimeManagerMain.validateRuntimeManage(adaptor, TOPOLOGY_NAME));
+
+    // Matched
+    ExecutionEnvironment.ExecutionState correctState = stateBuilder.setRole(ROLE).build();
+    Mockito.when(adaptor.getExecutionState(Mockito.eq(TOPOLOGY_NAME))).thenReturn(correctState);
+    Assert.assertTrue(runtimeManagerMain.validateRuntimeManage(adaptor, TOPOLOGY_NAME));
+  }
+
+  /**
+   * Test manageTopology()
+   */
+  @Test
+  public void testManageTopology() throws Exception {
+    Config config = Mockito.mock(Config.class);
+    Mockito.when(config.getStringValue(ConfigKeys.get("TOPOLOGY_NAME"))).thenReturn(TOPOLOGY_NAME);
+
+    RuntimeManagerMain runtimeManagerMain =
+        Mockito.spy(new RuntimeManagerMain(config, MOCK_COMMAND));
+
+    // Failed to create state manager instance
+    final String CLASS_NOT_EXIST = "class_not_exist";
+    Mockito.when(config.getStringValue(ConfigKeys.get("STATE_MANAGER_CLASS"))).
+        thenReturn(CLASS_NOT_EXIST);
+    Assert.assertFalse(runtimeManagerMain.manageTopology());
+
+    // Valid state manager class
+    Mockito.when(config.getStringValue(ConfigKeys.get("STATE_MANAGER_CLASS"))).
+        thenReturn(NullStateManager.class.getName());
+
+    // Failed to valid
+    Mockito.doReturn(false).when(runtimeManagerMain).
+        validateRuntimeManage(Mockito.any(SchedulerStateManagerAdaptor.class),
+            Mockito.eq(TOPOLOGY_NAME));
+    Assert.assertFalse(runtimeManagerMain.manageTopology());
+
+    // Legal request
+    Mockito.doReturn(true).when(runtimeManagerMain).
+        validateRuntimeManage(Mockito.any(SchedulerStateManagerAdaptor.class),
+            Mockito.eq(TOPOLOGY_NAME));
+
+    // Failed to get ISchedulerClient
+    Mockito.doReturn(null).when(runtimeManagerMain).
+        getSchedulerClient(Mockito.any(Config.class));
+    Assert.assertFalse(runtimeManagerMain.manageTopology());
+
+    // Successfully get ISchedulerClient
+    ISchedulerClient client = Mockito.mock(ISchedulerClient.class);
+    Mockito.doReturn(client).when(runtimeManagerMain).
+        getSchedulerClient(Mockito.any(Config.class));
+
+    // Failed to callRuntimeManagerRunner
+    Mockito.doReturn(false).when(runtimeManagerMain).
+        callRuntimeManagerRunner(Mockito.any(Config.class), Mockito.eq(client));
+    Assert.assertFalse(runtimeManagerMain.manageTopology());
+
+    // Happy path
+    Mockito.doReturn(true).when(runtimeManagerMain).
+        callRuntimeManagerRunner(Mockito.any(Config.class), Mockito.eq(client));
+    Assert.assertTrue(runtimeManagerMain.manageTopology());
+  }
+}
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/SubmitterMainTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/SubmitterMainTest.java
new file mode 100644
index 0000000..80385a9
--- /dev/null
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/SubmitterMainTest.java
@@ -0,0 +1,161 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.scheduler;
+
+import java.net.URI;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.ConfigKeys;
+import com.twitter.heron.spi.packing.IPacking;
+import com.twitter.heron.spi.scheduler.ILauncher;
+import com.twitter.heron.spi.statemgr.IStateManager;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import com.twitter.heron.spi.uploader.IUploader;
+import com.twitter.heron.spi.utils.ReflectionUtils;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ReflectionUtils.class)
+public class SubmitterMainTest {
+  private static final String TOPOLOGY_NAME = "topologyName";
+
+  private static final String STATE_MANAGER_CLASS = "STATE_MANAGER_CLASS";
+  private static final String LAUNCHER_CLASS = "LAUNCHER_CLASS";
+  private static final String PACKING_CLASS = "PACKING_CLASS";
+  private static final String UPLOADER_CLASS = "UPLOADER_CLASS";
+
+  @Test
+  public void testValidateSubmit() throws Exception {
+    Config config = Mockito.mock(Config.class);
+
+    SchedulerStateManagerAdaptor adaptor = Mockito.mock(SchedulerStateManagerAdaptor.class);
+    TopologyAPI.Topology topology = TopologyAPI.Topology.getDefaultInstance();
+    SubmitterMain submitterMain = new SubmitterMain(config, topology);
+
+    // Topology is running
+    Mockito.when(adaptor.isTopologyRunning(Mockito.eq(TOPOLOGY_NAME))).thenReturn(true);
+    Assert.assertFalse(submitterMain.validateSubmit(adaptor, TOPOLOGY_NAME));
+
+    // Topology is not running
+    Mockito.when(adaptor.isTopologyRunning(Mockito.eq(TOPOLOGY_NAME))).thenReturn(null);
+    Assert.assertTrue(submitterMain.validateSubmit(adaptor, TOPOLOGY_NAME));
+    Mockito.when(adaptor.isTopologyRunning(Mockito.eq(TOPOLOGY_NAME))).thenReturn(false);
+    Assert.assertTrue(submitterMain.validateSubmit(adaptor, TOPOLOGY_NAME));
+  }
+
+  /**
+   * Unit test submitTopology method
+   * @throws Exception
+   */
+  @Test
+  public void testSubmitTopology() throws Exception {
+    // Mock objects to be verified
+    IStateManager statemgr = Mockito.mock(IStateManager.class);
+    ILauncher launcher = Mockito.mock(ILauncher.class);
+    IPacking packing = Mockito.mock(IPacking.class);
+    IUploader uploader = Mockito.mock(IUploader.class);
+
+    // Mock ReflectionUtils stuff
+    PowerMockito.spy(ReflectionUtils.class);
+    PowerMockito.doReturn(statemgr).
+        when(ReflectionUtils.class, "newInstance", STATE_MANAGER_CLASS);
+    PowerMockito.doReturn(launcher).
+        when(ReflectionUtils.class, "newInstance", LAUNCHER_CLASS);
+    PowerMockito.doReturn(packing).
+        when(ReflectionUtils.class, "newInstance", PACKING_CLASS);
+    PowerMockito.doReturn(uploader).
+        when(ReflectionUtils.class, "newInstance", UPLOADER_CLASS);
+
+    Config config = Mockito.mock(Config.class);
+    Mockito.
+        when(config.getStringValue(ConfigKeys.get(STATE_MANAGER_CLASS))).
+        thenReturn(STATE_MANAGER_CLASS);
+    Mockito.
+        when(config.getStringValue(ConfigKeys.get(LAUNCHER_CLASS))).
+        thenReturn(LAUNCHER_CLASS);
+    Mockito.
+        when(config.getStringValue(ConfigKeys.get(PACKING_CLASS))).
+        thenReturn(PACKING_CLASS);
+    Mockito.
+        when(config.getStringValue(ConfigKeys.get(UPLOADER_CLASS))).
+        thenReturn(UPLOADER_CLASS);
+
+    // Instances to test
+    TopologyAPI.Topology topology = TopologyAPI.Topology.getDefaultInstance();
+    SubmitterMain submitterMain = Mockito.spy(new SubmitterMain(config, topology));
+
+    // Failed to instantiate
+    final String CLASS_NOT_EXIST = "class_not_exist";
+    Mockito.
+        when(config.getStringValue(ConfigKeys.get(UPLOADER_CLASS))).
+        thenReturn(CLASS_NOT_EXIST);
+    Assert.assertFalse(submitterMain.submitTopology());
+    Mockito.verify(uploader, Mockito.never()).close();
+    Mockito.verify(packing, Mockito.never()).close();
+    Mockito.verify(launcher, Mockito.never()).close();
+    Mockito.verify(statemgr, Mockito.never()).close();
+
+    // OK to instantiate all resources
+    Mockito.
+        when(config.getStringValue(ConfigKeys.get(UPLOADER_CLASS))).
+        thenReturn(UPLOADER_CLASS);
+
+    // Failed to validate the submission
+    Mockito.doReturn(false).when(submitterMain).
+        validateSubmit(Mockito.any(SchedulerStateManagerAdaptor.class), Mockito.anyString());
+    Assert.assertFalse(submitterMain.submitTopology());
+    // Resources should be closed even the submission failed
+    Mockito.verify(uploader, Mockito.atLeastOnce()).close();
+    Mockito.verify(packing, Mockito.atLeastOnce()).close();
+    Mockito.verify(launcher, Mockito.atLeastOnce()).close();
+    Mockito.verify(statemgr, Mockito.atLeastOnce()).close();
+
+    // validated the submission
+    Mockito.doReturn(true).when(submitterMain).
+        validateSubmit(Mockito.any(SchedulerStateManagerAdaptor.class), Mockito.anyString());
+
+    // Failed to upload package, return null
+    Mockito.doReturn(null).when(submitterMain).
+        uploadPackage(Mockito.eq(uploader));
+    Assert.assertFalse(submitterMain.submitTopology());
+    // Should not invoke undo
+    Mockito.verify(uploader, Mockito.never()).undo();
+
+    // OK to upload package
+    final URI packageURI = new URI("mock://uri:924/x#ke");
+    Mockito.doReturn(packageURI).when(submitterMain).
+        uploadPackage(Mockito.eq(uploader));
+
+    // Failed to callLauncherRunner
+    Mockito.doReturn(false).when(submitterMain).
+        callLauncherRunner(Mockito.any(Config.class));
+    Assert.assertFalse(submitterMain.submitTopology());
+    // Should invoke undo
+    Mockito.verify(uploader).undo();
+
+    // Happy path
+    Mockito.doReturn(true).when(submitterMain).
+        callLauncherRunner(Mockito.any(Config.class));
+    Assert.assertTrue(submitterMain.submitTopology());
+  }
+}
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/client/SchedulerClientFactoryTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/client/SchedulerClientFactoryTest.java
new file mode 100644
index 0000000..9f31c32
--- /dev/null
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/client/SchedulerClientFactoryTest.java
@@ -0,0 +1,76 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.scheduler.client;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.twitter.heron.proto.scheduler.Scheduler;
+import com.twitter.heron.scheduler.NullScheduler;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.ConfigKeys;
+import com.twitter.heron.spi.common.Keys;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+
+public class SchedulerClientFactoryTest {
+  private static final String TOPOLOGY_NAME = "shiwei_0924_jiayou";
+
+  @Test
+  public void testGetServiceSchedulerClient() throws Exception {
+    // Instantiate mock objects
+    Config config = Mockito.mock(Config.class);
+    Config runtime = Mockito.mock(Config.class);
+    SchedulerStateManagerAdaptor statemgr = Mockito.mock(SchedulerStateManagerAdaptor.class);
+
+    // Get a ServiceSchedulerClient
+    Mockito.when(config.getBooleanValue(ConfigKeys.get("SCHEDULER_IS_SERVICE"))).thenReturn(true);
+
+    // Mock the runtime object
+    Mockito.when(runtime.get(Keys.schedulerStateManagerAdaptor())).thenReturn(statemgr);
+    Mockito.when(runtime.getStringValue(Keys.topologyName())).thenReturn(TOPOLOGY_NAME);
+
+    // Failed to getSchedulerLocation
+    Mockito.when(statemgr.getSchedulerLocation(Mockito.eq(TOPOLOGY_NAME))).thenReturn(null);
+    Assert.assertNull(new SchedulerClientFactory(config, runtime).getSchedulerClient());
+    Mockito.verify(statemgr).getSchedulerLocation(Mockito.eq(TOPOLOGY_NAME));
+
+    // Get a schedulerLocation successfully
+    Mockito.when(statemgr.getSchedulerLocation(Mockito.eq(TOPOLOGY_NAME))).
+        thenReturn(Scheduler.SchedulerLocation.getDefaultInstance());
+    Assert.assertNotNull(new SchedulerClientFactory(config, runtime).getSchedulerClient());
+  }
+
+  @Test
+  public void testGetLibrarySchedulerClient() throws Exception {
+    // Instantiate mock objects
+    Config config = Mockito.mock(Config.class);
+    Config runtime = Mockito.mock(Config.class);
+
+    // Return a NullScheduler
+    Mockito.when(config.getStringValue(ConfigKeys.get("SCHEDULER_CLASS"))).
+        thenReturn(NullScheduler.class.getName());
+
+    // Get a LibrarySchedulerClient
+    Mockito.when(config.getBooleanValue(ConfigKeys.get("SCHEDULER_IS_SERVICE"))).thenReturn(false);
+    Assert.assertNotNull(new SchedulerClientFactory(config, runtime).getSchedulerClient());
+
+    // Return some scheduler class not exist
+    final String SCHEDULER_CLASS_NOT_EXIST = "class_not_exist";
+    Mockito.when(config.getStringValue(ConfigKeys.get("SCHEDULER_CLASS"))).
+        thenReturn(SCHEDULER_CLASS_NOT_EXIST);
+    Assert.assertNull(new SchedulerClientFactory(config, runtime).getSchedulerClient());
+  }
+}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalLauncher.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalLauncher.java
index 475bdec..8df263e 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalLauncher.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalLauncher.java
@@ -192,7 +192,7 @@
     int ret = ShellUtils.runSyncProcess(LocalContext.verbose(config), LocalContext.verbose(config),
         cmd, new StringBuilder(), new StringBuilder(), parentDirectory);
 
-    return ret == 0 ? true : false;
+    return ret == 0;
   }
 
   /**
@@ -208,7 +208,7 @@
     int ret = ShellUtils.runSyncProcess(LocalContext.verbose(config), LocalContext.verbose(config),
         cmd, new StringBuilder(), new StringBuilder(), new File(targetFolder));
 
-    return ret == 0 ? true : false;
+    return ret == 0;
   }
 
   ///////////////////////////////////////////////////////////////////////////////
diff --git a/heron/statemgrs/src/java/BUILD b/heron/statemgrs/src/java/BUILD
index 374ffc1..7013070 100644
--- a/heron/statemgrs/src/java/BUILD
+++ b/heron/statemgrs/src/java/BUILD
@@ -31,6 +31,25 @@
 )
 
 java_library(
+    name = "null-statemgr-java",
+    srcs = glob(["**/NullStateManager.java"]),
+    deps = common_deps_files,
+)
+
+java_binary(
+    name = "null-statemgr-unshaded",
+    srcs = glob(["**/NullStateManager.java"]),
+    deps = common_deps_files,
+)
+
+genrule(
+    name = "heron-null-statemgr",
+    srcs = [":null-statemgr-unshaded_deploy.jar"],
+    outs = ["heron-null-statemgr.jar"],
+    cmd  = "cp $< $@",
+)
+
+java_library(
     name = "localfs-statemgr-java",
     srcs = glob(["**/FileSystemStateManager.java"]) + glob(["**/localfs/**/*.java"]),
     resources = glob(["**/localfs/**/*.yaml"]),
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
index 626e7e8..5899ea1 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
@@ -72,7 +72,7 @@
     boolean executionStateDir = FileUtils.isDirectoryExists(getExecutionStateDir())
         || FileUtils.createDirectory(getExecutionStateDir());
 
-    boolean schedulerLocationDir =  FileUtils.isDirectoryExists(getSchedulerLocationDir())
+    boolean schedulerLocationDir = FileUtils.isDirectoryExists(getSchedulerLocationDir())
         || FileUtils.createDirectory(getSchedulerLocationDir());
 
     if (topologyDir && tmasterLocationDir && physicalPlanDir && executionStateDir
@@ -84,9 +84,9 @@
   }
 
   // Make utils class protected for easy unit testing
-  protected ListenableFuture<Boolean> setData(String path, byte[] data) {
+  protected ListenableFuture<Boolean> setData(String path, byte[] data, boolean overwrite) {
     final SettableFuture<Boolean> future = SettableFuture.create();
-    boolean ret = FileUtils.writeToFile(path, data);
+    boolean ret = FileUtils.writeToFile(path, data, overwrite);
     future.set(ret);
 
     return future;
@@ -122,30 +122,36 @@
   @Override
   public ListenableFuture<Boolean> setExecutionState(
       ExecutionEnvironment.ExecutionState executionState, String topologyName) {
-    return setData(getExecutionStatePath(topologyName), executionState.toByteArray());
+    return setData(getExecutionStatePath(topologyName), executionState.toByteArray(), false);
   }
 
   @Override
   public ListenableFuture<Boolean> setTMasterLocation(
       TopologyMaster.TMasterLocation location, String topologyName) {
-    return setData(getTMasterLocationPath(topologyName), location.toByteArray());
+    // Note: Unlike Zk statemgr, we overwrite the location even if there is already one.
+    // This is because when running in local mode we control when a tmaster dies and
+    // comes up deterministically.
+    return setData(getTMasterLocationPath(topologyName), location.toByteArray(), true);
   }
 
   @Override
   public ListenableFuture<Boolean> setTopology(TopologyAPI.Topology topology, String topologyName) {
-    return setData(getTopologyPath(topologyName), topology.toByteArray());
+    return setData(getTopologyPath(topologyName), topology.toByteArray(), false);
   }
 
   @Override
   public ListenableFuture<Boolean> setPhysicalPlan(
       PhysicalPlans.PhysicalPlan physicalPlan, String topologyName) {
-    return setData(getPhysicalPlanPath(topologyName), physicalPlan.toByteArray());
+    return setData(getPhysicalPlanPath(topologyName), physicalPlan.toByteArray(), false);
   }
 
   @Override
   public ListenableFuture<Boolean> setSchedulerLocation(
       Scheduler.SchedulerLocation location, String topologyName) {
-    return setData(getSchedulerLocationPath(topologyName), location.toByteArray());
+    // Note: Unlike Zk statemgr, we overwrite the location even if there is already one.
+    // This is because when running in local mode we control when a scheduler dies and
+    // comes up deterministically.
+    return setData(getSchedulerLocationPath(topologyName), location.toByteArray(), true);
   }
 
   @Override
diff --git a/heron/statemgrs/tests/java/BUILD b/heron/statemgrs/tests/java/BUILD
index d4e17c1..360b11d 100644
--- a/heron/statemgrs/tests/java/BUILD
+++ b/heron/statemgrs/tests/java/BUILD
@@ -5,6 +5,7 @@
     "//3rdparty/java:powermock",
     "@bazel_tools//third_party:mockito",
     "@bazel_tools//third_party:junit4",
+    "@guava//jar",
 ]
 
 spi_deps_files = [
diff --git a/heron/statemgrs/tests/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManagerTest.java b/heron/statemgrs/tests/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManagerTest.java
index bdfdd74..39a69a1 100644
--- a/heron/statemgrs/tests/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManagerTest.java
+++ b/heron/statemgrs/tests/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManagerTest.java
@@ -14,6 +14,8 @@
 
 package com.twitter.heron.statemgr.localfs;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -99,6 +101,23 @@
   }
 
   /**
+   * Method: setSchedulerLocation(Scheduler.SchedulerLocation location, String topologyName)
+   */
+  @Test
+  public void testSetSchedulerLocation() throws Exception {
+    LocalFileSystemStateManager manager =
+        Mockito.spy(new LocalFileSystemStateManager());
+    manager.initialize(getConfig());
+
+    Mockito.doReturn(Mockito.mock(ListenableFuture.class)).when(manager).
+        setData(Mockito.anyString(), Mockito.any(byte[].class), Mockito.anyBoolean());
+
+    manager.setSchedulerLocation(Scheduler.SchedulerLocation.getDefaultInstance(), "");
+    Mockito.verify(manager).
+        setData(Mockito.anyString(), Mockito.any(byte[].class), Mockito.eq(true));
+  }
+
+  /**
    * Method: setExecutionState(ExecutionEnvironment.ExecutionState executionState)
    */
   @Test
@@ -117,14 +136,15 @@
         ExecutionEnvironment.ExecutionState.getDefaultInstance();
 
     PowerMockito.doReturn(true).
-        when(FileUtils.class, "writeToFile", Matchers.anyString(), Matchers.any(byte[].class));
+        when(FileUtils.class, "writeToFile", Mockito.anyString(), Mockito.any(byte[].class),
+            Mockito.anyBoolean());
 
     Assert.assertTrue(manager.setExecutionState(defaultState, "").get());
 
     PowerMockito.verifyStatic();
     FileUtils.writeToFile(Matchers.eq(String.format("%s/%s/%s",
             ROOT_ADDR, "executionstate", defaultState.getTopologyName())),
-        Matchers.eq(defaultState.toByteArray()));
+        Matchers.eq(defaultState.toByteArray()), Mockito.eq(false));
   }
 
   /**
@@ -144,7 +164,8 @@
     Assert.assertTrue(manager.initTree());
 
     PowerMockito.doReturn(true).
-        when(FileUtils.class, "writeToFile", Matchers.anyString(), Matchers.any(byte[].class));
+        when(FileUtils.class, "writeToFile", Mockito.anyString(), Mockito.any(byte[].class),
+            Mockito.anyBoolean());
 
     Assert.assertTrue(manager.setTopology(topology, TOPOLOGY_NAME).get());
 
@@ -152,7 +173,7 @@
     FileUtils.writeToFile(
         Matchers.eq(String.format("%s/%s/%s",
             ROOT_ADDR, "topologies", TOPOLOGY_NAME)),
-        Matchers.eq(topology.toByteArray()));
+        Matchers.eq(topology.toByteArray()), Mockito.eq(false));
   }
 
   /**
diff --git a/heron/ui/src/python/args.py b/heron/ui/src/python/args.py
index c3ce191..4b055f7 100644
--- a/heron/ui/src/python/args.py
+++ b/heron/ui/src/python/args.py
@@ -64,7 +64,7 @@
 
 def create_parsers():
   parser = argparse.ArgumentParser(
-      epilog = 'For detailed documentation, go to http://go/heron',
+      epilog = 'For detailed documentation, go to http://heronstreaming.io',
       usage = "%(prog)s [options] [help]",
       add_help = False)
 
diff --git a/scripts/travis/build.sh b/scripts/travis/build.sh
index 52986ed..948298b 100755
--- a/scripts/travis/build.sh
+++ b/scripts/travis/build.sh
@@ -4,16 +4,40 @@
 # of the below commands fail so we need to chain them in this script.
 #
 
-set -ex
+set -e
 
 # verify that jars have not been added to the repo
 JARS=`find . -name "*.jar"`
 if [ "$JARS" ]; then
-  echo "ERROR: The following jars were found in the repo, which is no permitted. Instead add the jar to WORKSPACE as a maven_jar."
+  echo "ERROR: The following jars were found in the repo, which is not permitted. Instead add the jar to WORKSPACE as a maven_jar."
   echo $JARS
   exit 1
 fi
 
+# verify that eggs have not been added to the repo
+# ./3rdparty/pex/wheel-0.23.0-py2.7.egg should be the only one
+set +e
+EGGS=`find . -name "*.egg" | grep -v "3rdparty/pex/wheel"`
+set -e
+if [ "$EGGS" ]; then
+  echo 'ERROR: The following eggs were found in the repo, which is not permitted. Python dependencies should be added using the "reqs" attribute:'
+  echo $EGGS
+  exit 1
+fi
+
+# verify that wheels have not been added to the repo
+# ./3rdparty/pex/setuptools-18.0.1-py2.py3-none-any.whl should be the only one
+set +e
+WHEELS=`find . -name "*.whl" | grep -v "3rdparty/pex/setuptools"`
+set -e
+if [ "$WHEELS" ]; then
+  echo 'ERROR: The following wheels were found in the repo, which is not permitted. Python dependencies should be added using the "reqs" attribute:'
+  echo $WHEELS
+  exit 1
+fi
+
+set +x
+
 # Run this manually, since if it fails when run as -workspace_status_command we don't get good output
 ./scripts/release/status.sh
 
@@ -25,7 +49,10 @@
 bazel --bazelrc=tools/travis-ci/bazel.rc build heron/...
 
 # run heron unit tests
-bazel --bazelrc=tools/travis-ci/bazel.rc test  heron/...
+bazel --bazelrc=tools/travis-ci/bazel.rc test --test_tag_filters=-flaky heron/...
+
+# flaky tests are often due to test port race conditions, which should be fixed. For now, run them serially
+bazel --bazelrc=tools/travis-ci/bazel.rc test --test_tag_filters=flaky --jobs=0 heron/...
 
 # build packages
 bazel --bazelrc=tools/travis-ci/bazel.rc build scripts/packages:tarpkgs
diff --git a/website/Makefile b/website/Makefile
index 8245448..7a0cb67 100755
--- a/website/Makefile
+++ b/website/Makefile
@@ -9,7 +9,7 @@
 	@npm run build
 
 pages:
-	@hugo
+	@hugo --serve
 
 linkchecker:
 	@bash linkchecker.sh
diff --git a/website/README.md b/website/README.md
index 5ff9e5d..88b62d7 100644
--- a/website/README.md
+++ b/website/README.md
@@ -8,8 +8,7 @@
 
 ## Documentation Setup
 
-Running the Heron documentation locally requires that you have the following
-installed:
+Running the Heron documentation locally requires that you have the following installed:
 
 * [Make](https://www.gnu.org/software/make/)
 * [Node.js](https://nodejs.org/en/)
@@ -17,18 +16,18 @@
 
 ### OS X Setup
 
-To install Node.js and npm on Mac OS X, make sure that you have
-[Homebrew](http://brew.sh/) installed and run:
+To install Node.js and npm on Mac OS X, make sure that you have [Homebrew](http://brew.sh/)
+installed and run:
 
 ```bash
 $ brew install nvm
 $ nvm install node
+$ curl -L https://www.npmjs.com/install.sh | sh
 ```
 
 Once this has completed:
 
 ```bash
-# Within the /heron directory
 $ cd website
 $ make setup
 ```
@@ -36,7 +35,7 @@
 This will install Hugo, Gulp, and all of the necessary Gulp plugins and build
 the static assets for the site.
 
-### Other Operating Systems
+### Other Operating Systems Setup
 
 Although the documentation is currently set up to be built and run on OS X, it's
 also possible to do so on other systems. In addition to Node.js and npm you will
@@ -48,39 +47,53 @@
 3. Run `npm run build` (this will build all of the necessary static assets, i.e.
    CSS, Javascript, etc.)
 
-Now you can run the docs locally. For more info, see the section directly below.
-
 ## Building the Docs Locally
 
-To build the docs:
+To build the docs locally:
 
 ```bash
 $ make site
 ```
 
-This will generate a full build of the docs in the `public` folder. To serve
-the docs locally, see the section directly below.
+This will generate a full build of the docs in the `public` folder, checking all links. If broken
+links are found, see `linkchecker-errors.csv`.
 
-## Running the Docs Locally
+## Running the Site Locally
+
+To serve the site locally:
 
 ```bash
 $ make serve
 ```
 
-This will run the docs locally on `localhost` port 1313. Navigate to
-`localhost:1313/heron` in your browser to see the served docs. Or open the
+This will run the docs locally on `localhost:1313`. Navigate to
+[localhost:1313/heron](http://localhost:1313/heron) to see the served docs. Or open the
 browser from the command line:
 
 ```bash
 $ open http://localhost:1313/heron
 ```
 
-To make site, including linkchecker.  If broken links found by linkchecker, see linkchecker-errors.csv
+## Publishing the Site
+
+The content on the [twitter.github.io/heron](http://twitter.github.io/heron) website is what is
+committed on the [gh-pages branch](https://github.com/twitter/heron/tree/gh-pages) of the heron repo.
+To simplify publishing docs generated from `master` onto the `gh-pages` branch, the output directory
+of the site build process (i.e., `website/public`) is a submodule that points to the `gh-pages` branch
+of the heron repo. As a result you will notice that when you cd into gh-pages and run `git status`
+or `git remote -v`, it appears as another heron repo based of the `gh-pages` branch.
 
 ```bash
-$ make site
+$ git status
+On branch master
+Your branch is up-to-date with 'origin/master'.
+$ cd website/public
+$ git status
+On branch gh-pages
+Your branch is up-to-date with 'origin/gh-pages'.
 ```
 
+To publish the site docs:
 
-
-
+1. Make the site as described in the above section. Verify all links are valid.
+2. Change to the `website/public` directory, commit and push to the `gh-pages` branch.
diff --git a/website/content/docs/contributors/codebase.md b/website/content/docs/contributors/codebase.md
index 653c570..d082066 100644
--- a/website/content/docs/contributors/codebase.md
+++ b/website/content/docs/contributors/codebase.md
@@ -21,7 +21,7 @@
 API](../../concepts/topologies), and [Heron Instance](../../concepts/architecture#heron-instance).
 It is currently the only language in which topologies can be written. Instructions can be found 
 in [Building Topologies](../developers/topologies.html), while API documentation for the Java
-API can be found [here](../api/topology/index.html). Please note that Heron topologies do not 
+API can be found [here](/api/topology/index.html). Please note that Heron topologies do not 
 require Java 8 and can be written in Java 7 or later.
 
 * **Python 2** (specifically 2.7) is used primarily for Heron's [CLI
@@ -41,8 +41,8 @@
 [`heron/proto`]({{% githubMaster %}}/heron/proto).
 
 * **Cluster coordination** &mdash; Heron relies heavily on ZooKeeper for cluster
-coordination for distributed deployment, be it for [Mesos/Aurora](../../operators/deployment/aurora),
-[Mesos alone](../../operators/deployment/mesos), or for a [custom
+coordination for distributed deployment, be it for [Mesos/Aurora](../../operators/deployment/schedulers/aurora),
+[Mesos alone](../../operators/deployment/schedulers/mesos), or for a [custom
 scheduler](../custom-scheduler) that you build. More information on ZooKeeper
 components in the codebase can be found in the [State
 Management](#state-management) section below.
@@ -56,18 +56,18 @@
 ## Cluster Scheduling
 
 Heron supports three cluster schedulers out of the box:
-[Mesos](../../operators/deployment/mesos),
-[Aurora](../../operators/deployment/aurora), and a [local
-scheduler](../../operators/deployment/local). The Java code for each of those
+[Mesos](../../operators/deployment/schedulers/mesos),
+[Aurora](../../operators/deployment/schedulers/aurora), and a [local
+scheduler](../../operators/deployment/schedulers/local). The Java code for each of those
 schedulers, as well as for the underlying scheduler API, can be found in
 [`heron/schedulers`]({{% githubMaster %}}/heron/schedulers).
 
 Info on custom schedulers can be found in [Implementing a Custom
 Scheduler](../custom-scheduler); info on the currently available schedulers
 can be found in [Deploying Heron on
-Aurora](../../operators/deployment/aurora), [Deploying Heron on
-Mesos](../../operators/deployment/mesos), and [Local
-Deployment](../../operators/deployment/local).
+Aurora](../../operators/deployment/schedulers/aurora), [Deploying Heron on
+Mesos](../../operators/deployment/schedulers/mesos), and [Local
+Deployment](../../operators/deployment/schedulers/local).
 
 ## State Management
 
@@ -117,7 +117,7 @@
 
 Documentation for writing topologies can be found in [Building
 Topologies](../developers/topologies.html), while API documentation can be found
-[here](../api/topology/index.html).
+[here](/api/topology/index.html).
 
 ### Local Mode
 
@@ -146,8 +146,8 @@
 
 Sample configurations for different Heron schedulers 
 
-* [Local scheduler](../../operators/deployment/local) config can be found in [`heron/config/src/yaml/conf/local`]({{% githubMaster %}}/heron/config/src/yaml/conf/local),
-* [Aurora scheduler](../../operators/deployment/aurora) config can be found [`heron/config/src/yaml/conf/aurora`]({{% githubMaster %}}/heron/config/src/yaml/conf/aurora).
+* [Local scheduler](../../operators/deployment/schedulers/local) config can be found in [`heron/config/src/yaml/conf/local`]({{% githubMaster %}}/heron/config/src/yaml/conf/local),
+* [Aurora scheduler](../../operators/deployment/schedulers/aurora) config can be found [`heron/config/src/yaml/conf/aurora`]({{% githubMaster %}}/heron/config/src/yaml/conf/aurora).
 
 ### Heron Tracker
 
diff --git a/website/content/docs/contributors/custom-metrics-sink.md b/website/content/docs/contributors/custom-metrics-sink.md
index a64353b..07ea52a 100644
--- a/website/content/docs/contributors/custom-metrics-sink.md
+++ b/website/content/docs/contributors/custom-metrics-sink.md
@@ -7,7 +7,7 @@
 metrics from all instances in the topology. You can define how the MM processes
 metrics by implementing a **metrics sink**, which specifies how the MM handles
 incoming
-[`MetricsRecord`](../api/metrics/com/twitter/heron/metricsmgr/api/metrics/MetricsRecord.html)
+[`MetricsRecord`](/api/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecord.html)
 objects.
 
 Java is currently the only supported language for custom metrics sinks. This may
@@ -19,15 +19,15 @@
 for a specific topology. The code for these sinks may prove helpful for
 implementing your own.
 
-* [`GraphiteSink`](../api/metrics/com/twitter/heron/metricsmgr/sink/GraphiteSink.html)
+* [`GraphiteSink`](/api/metrics/com/twitter/heron/metricsmgr/sink/GraphiteSink.html)
   &mdash; Sends each `MetricsRecord` object to a
   [Graphite](http://graphite.wikidot.com/) instance according to a Graphite
   prefix.
-* [`ScribeSink`](../api/metrics/com/twitter/heron/metricsmgr/sink/ScribeSink.html)
+* [`ScribeSink`](/api/metrics/com/twitter/heron/metricsmgr/sink/ScribeSink.html)
   &mdash; Sends each `MetricsRecord` object to a
   [Scribe](https://github.com/facebookarchive/scribe) instance according to a
   Scribe category and namespace.
-* [`FileSink`](../api/metrics/com/twitter/heron/metricsmgr/sink/FileSink.html)
+* [`FileSink`](/api/metrics/com/twitter/heron/metricsmgr/sink/FileSink.html)
   &mdash; Writes each `MetricsRecord` object to a JSON file at a specified path.
 
 More on using those sinks in a Heron cluster can be found in [Metrics
@@ -66,7 +66,7 @@
   initialization behavior of the sink. The `conf` map is the configuration that
   is passed to the sink by the `.yaml` configuration file at
   `heron/config/metrics_sink.yaml`; the
-  [`SinkContext`](../api/metrics/com/twitter/heron/metricsmgr/api/sink/SinkContext.html)
+  [`SinkContext`](/api/com/twitter/heron/spi/metricsmgr/sink/SinkContext.html)
   object enables you to access values from the sink's runtime context
   (the ID of the metrics manager, the ID of the sink, and the name of the
   topology).
diff --git a/website/content/docs/contributors/custom-scheduler.md b/website/content/docs/contributors/custom-scheduler.md
index 994e144..c3bc0c5 100644
--- a/website/content/docs/contributors/custom-scheduler.md
+++ b/website/content/docs/contributors/custom-scheduler.md
@@ -5,14 +5,14 @@
 To run a Heron cluster, you'll need to set up a scheduler that is responsible
 for cluster management. Heron supports three schedulers out of the box:
 
-* [Mesos](../../operators/deployment/mesos)
-* [Aurora](../../operators/deployment/aurora)
-* [Local scheduler](../../operators/deployment/local)
+* [Mesos](../../operators/deployment/schedulers/mesos)
+* [Aurora](../../operators/deployment/schedulers/aurora)
+* [Local scheduler](../../operators/deployment/schedulers/local)
 
 If you'd like to run Heron on a not-yet-supported system, such as
 [YARN](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)
 or [Amazon ECS](https://aws.amazon.com/ecs/), you can create your own scheduler
-using Heron's [scheduler API](../api/scheduler/index.html), as detailed in the
+using Heron's [scheduler API](/api/scheduler/index.html), as detailed in the
 sections below.
 
 Java is currently the only supported language for custom schedulers. This may
@@ -47,11 +47,11 @@
 
 Interface | Role | Examples
 :-------- |:---- |:--------
-[`IConfigLoader`](../api/scheduler/com/twitter/heron/scheduler/api/IConfigLoader.html) | Parsing and loading of configuration for the scheduler | [Aurora](../api/scheduler/com/twitter/heron/scheduler/aurora/AuroraConfigLoader.html), [Mesos](../api/scheduler/com/twitter/heron/scheduler/mesos/MesosConfigLoader.html), [local](../api/scheduler/com/twitter/heron/scheduler/local/LocalConfigLoader.html)
-[`ILauncher`](../api/scheduler/com/twitter/heron/scheduler/api/ILauncher.html) | Defines how the scheduler is launched | [Aurora](../api/scheduler/com/twitter/heron/scheduler/aurora/AuroraLauncher.html), [Mesos](../api/scheduler/com/twitter/heron/scheduler/mesos/MesosLauncher.html), [local](../api/scheduler/com/twitter/heron/scheduler/local/LocalLauncher.html)
-[`IRuntimeManager`](../api/scheduler/com/twitter/heron/scheduler/api/IRuntimeManager.html) | Handles runtime tasks such as activating topologies, killing topologies, etc. | [Aurora](../api/scheduler/com/twitter/heron/scheduler/aurora/AuroraTopologyRuntimeManager.html), [Mesos](../api/scheduler/com/twitter/heron/scheduler/mesos/MesosTopologyRuntimeManager.html), [local](../api/scheduler/com/twitter/heron/scheduler/local/LocalTopologyRuntimeManager.html)
-[`IScheduler`](../api/scheduler/com/twitter/heron/scheduler/api/IScheduler.html) | Defines the scheduler object used to construct topologies | [Mesos](../api/scheduler/com/twitter/heron/scheduler/mesos/MesosScheduler.html), [local](../api/scheduler/com/twitter/heron/scheduler/local/LocalScheduler.html)
-[`IUploader`](../api/scheduler/com/twitter/heron/scheduler/api/IUploader.html) | Uploads the topology to a shared location that must be accessible to the runtime environment of the topology | [Aurora](), [Mesos](), [local](../api/scheduler/com/twitter/heron/scheduler/local/LocalUploader.html)
+[`IConfigLoader`](/api/com/twitter/heron/spi/scheduler/IConfigLoader.html) | Parsing and loading of configuration for the scheduler | [Aurora](/api/scheduler/com/twitter/heron/scheduler/aurora/AuroraConfigLoader.html), [Mesos](/api/scheduler/com/twitter/heron/scheduler/mesos/MesosConfigLoader.html), [local](/api/scheduler/com/twitter/heron/scheduler/local/LocalConfigLoader.html)
+[`ILauncher`](/api/com/twitter/heron/spi/scheduler/ILauncher.html) | Defines how the scheduler is launched | [Aurora](/api/scheduler/com/twitter/heron/scheduler/aurora/AuroraLauncher.html), [Mesos](/api/scheduler/com/twitter/heron/scheduler/mesos/MesosLauncher.html), [local](/api/scheduler/com/twitter/heron/scheduler/local/LocalLauncher.html)
+[`IRuntimeManager`](/api/com/twitter/heron/spi/scheduler/IRuntimeManager.html) | Handles runtime tasks such as activating topologies, killing topologies, etc. | [Aurora](/api/scheduler/com/twitter/heron/scheduler/aurora/AuroraTopologyRuntimeManager.html), [Mesos](/api/scheduler/com/twitter/heron/scheduler/mesos/MesosTopologyRuntimeManager.html), [local](/api/scheduler/com/twitter/heron/scheduler/local/LocalTopologyRuntimeManager.html)
+[`IScheduler`](/api/com/twitter/heron/spi/scheduler/IScheduler.html) | Defines the scheduler object used to construct topologies | [Mesos](/api/scheduler/com/twitter/heron/scheduler/mesos/MesosScheduler.html), [local](/api/scheduler/com/twitter/heron/scheduler/local/LocalScheduler.html)
+[`IUploader`](/api/com/twitter/heron/spi/scheduler/IUploader.html) | Uploads the topology to a shared location that must be accessible to the runtime environment of the topology | [Aurora](), [Mesos](), [local](/api/scheduler/com/twitter/heron/scheduler/local/LocalUploader.html)
 
 Your implementation of those interfaces will need to be on Heron's
 [classpath](https://docs.oracle.com/javase/tutorial/essential/environment/paths.html)
@@ -60,14 +60,14 @@
 ## Loading Configuration
 
 You can set up a configuration loader for a custom scheduler by implementing the
-[`IConfig`](../api/scheduler/com/twitter/heron/scheduler/api/IConfig.html)
+[`IConfig`](/api/com/twitter/heron/spi/scheduler/IConfig.html)
 interface. You can use this interface to load configuration from any source
 you'd like, e.g. YAML files, JSON files, or a web service.
 
 If you'd like to load configuration from files using the same syntax as Heron's
 default configuration files for the Aurora, Mesos, and local schedulers (in
 `heron/cli2/src/python`), you can implement the
-[`DefaultConfigLoader`](../api/scheduler/com/twitter/heron/scheduler/util/DefaultConfigLoader.html)
+[`DefaultConfigLoader`](/api/scheduler/com/twitter/heron/scheduler/util/DefaultConfigLoader.html)
 interface.
 
 ## Configurable Parameters
@@ -75,7 +75,7 @@
 At the very least, your configuration loader will need to be able to load the
 class names (as strings) for your implementations of the components listed
 above, as you can see from the interface definition for
-[`IConfigLoader`](../api/scheduler/com/twitter/heron/scheduler/api/IConfigLoader.html).
+[`IConfigLoader`](/api/com/twitter/heron/spi/scheduler/IConfigLoader.html).
 
 ## Trying Out Your Scheduler
 
diff --git a/website/content/docs/developers/data-model.md b/website/content/docs/developers/data-model.md
index 566a33a..454e5ed 100644
--- a/website/content/docs/developers/data-model.md
+++ b/website/content/docs/developers/data-model.md
@@ -7,20 +7,20 @@
 processed by [bolts](bolts.html) consists of tuples.
 
 Heron has a special
-[`Tuple`](../api/topology/com/twitter/heron/api/tuple/Tuple.html) interface for
+[`Tuple`](/api/topology/com/twitter/heron/api/tuple/Tuple.html) interface for
 working with tuples. Heron `Tuple`s can hold values of any type; values are
 accessible either by providing an index or a field name.
 
 ## Using Tuples
 
 Heron's `Tuple` interface contains the methods listed in the [Javadoc
-definition](../api/topology/com/twitter/heron/api/tuple/Tuple.html).
+definition](/api/topology/com/twitter/heron/api/tuple/Tuple.html).
 
 ### Accessing Primitive Types By Index
 
 Heron `Tuple`s support a wide variety of primitive Java types, including
 strings, Booleans, byte arrays, and more.
-[`getString`](../api/topology/com/twitter/heron/api/tuple/Tuple.html#getString-int-)
+[`getString`](/api/topology/com/twitter/heron/api/tuple/Tuple.html#getString-int-)
 method, for example, takes an integer index and returns either a string or
 `null` if no string value is present at that index. Analogous methods can be
 found in the Javadoc.
@@ -29,7 +29,7 @@
 
 In addition to being accessible via index, values stored in Heron tuples are
 accessible via field name as well. The
-[`getStringByField`](../api/topology/com/twitter/heron/api/tuple/Tuple.html#getStringByField-java.lang.String-)
+[`getStringByField`](/api/topology/com/twitter/heron/api/tuple/Tuple.html#getStringByField-java.lang.String-)
 method, for example, takes a field name string and returns either a string or
 `null` if no string value is present for that field name. Analogous methods can
 be found in the Javadoc.
@@ -41,12 +41,12 @@
 an index or a field name. The following methods return either an `Object` or
 `null` if no object is present:
 
-* [`getValue`](../api/topology/com/twitter/heron/api/tuple/Tuple.html#getValue-int-)
-* [`getValueByField`](../api/topology/com/twitter/heron/api/tuple/Tuple.html#getValueByField-java.lang.String-)
+* [`getValue`](/api/topology/com/twitter/heron/api/tuple/Tuple.html#getValue-int-)
+* [`getValueByField`](/api/topology/com/twitter/heron/api/tuple/Tuple.html#getValueByField-java.lang.String-)
 
 You can also retrieve all objects contained in a Heron `Tuple` as a Java
 [List](https://docs.oracle.com/javase/8/docs/api/java/util/List.html) using the
-[`getValues`](../api/topology/com/twitter/heron/api/tuple/Tuple.html#getValues--)
+[`getValues`](/api/topology/com/twitter/heron/api/tuple/Tuple.html#getValues--)
 method.
 
 ### User-defined Types
@@ -81,14 +81,14 @@
 There are additional methods available for determining the size of Heron
 `Tuple`s, extracting contextual information, and more. For a full listing of
 methods, see the
-[Javadoc](../api/topology/com/twitter/heron/api/tuple/Tuple.html).
+[Javadoc](/api/topology/com/twitter/heron/api/tuple/Tuple.html).
 
 ## Fields
 
 From the methods in the list above you can see that you can retrieve single
 values from a Heron tuple on the basis of their index. You can also retrieve
 multiple values using a
-[`Fields`](../api/topology/com/twitter/heron/api/tuple/Fields.html) object,
+[`Fields`](/api/topology/com/twitter/heron/api/tuple/Fields.html) object,
 which can be initialized either using varargs or a list of strings:
 
 ```java
diff --git a/website/content/docs/developers/serialization.md b/website/content/docs/developers/serialization.md
index a05ecaa..c13762e 100644
--- a/website/content/docs/developers/serialization.md
+++ b/website/content/docs/developers/serialization.md
@@ -3,7 +3,7 @@
 ---
 
 The tuple is Heron's core data type. Heron's native
-[`Tuple`](.io/topology-api/com/twitter/heron/api/tuple/Tuple) interface supports
+[`Tuple`](/api/com/twitter/heron/api/tuple/Tuple.html) interface supports
 a broad range of [basic data types](../data-model/#using-tuples), such as
 strings, integers, and booleans, out of the box, but tuples can contain values
 of any type. You can use data types beyond the core types by providing a custom
diff --git a/website/content/docs/getting-started.md b/website/content/docs/getting-started.md
index c2a99ce..0efc9f5 100644
--- a/website/content/docs/getting-started.md
+++ b/website/content/docs/getting-started.md
@@ -19,21 +19,25 @@
 * heron-client-install-\<version\>-darwin.sh
 * heron-tools-install-\<version\>-darwin.sh
 
-where \<version\> is the desired heron version.
+where \<version\> is the desired heron version. For example, \<version\>=0.14.0
 
-Run the download self installing binary for heron client as follows
+Run the download self installing binary for heron client using ```--user``` as follows
 ```bash
-$ chmod +x heron-client-install-0.13.2-darwin.sh
-$ ./heron-client-install-0.13.2-darwin.sh --user
+$ chmod +x heron-client-install-<version>-darwin.sh
+$ ./heron-client-install-<version>-darwin.sh --user
 Uncompressing......
 Heron is now installed!
 Make sure you have "/Users/$USER/bin" in your path.
 ```
-
-Run the download self installing binary for heron tools as follows
+To add ```/Users/$USER/bin``` to your path, run:
 ```bash
-$ chmod +x heron-tools-install-0.13.2-darwin.sh
-$ ./heron-tools-install-0.13.2-darwin.sh --user
+$ export PATH="$PATH:$HOME/bin"
+```
+
+Run the download self installing binary for heron tools using ```--user``` as follows
+```bash
+$ chmod +x heron-tools-install-<version>-darwin.sh
+$ ./heron-tools-install-<version>-darwin.sh --user
 Uncompressing......
 Heron Tools is now installed!
 Make sure you have "/Users/$USER/bin" in your path.
@@ -41,7 +45,7 @@
 
 ### Step 2 - Launch an example topology
 
-Launch an example [topology](../concepts/topologies) on **local cluster** using submit:
+Example topologies are installed with ```--user``` flag in ```~/.heron/examples```.  Launch an example [topology](../concepts/topologies) on **local cluster** using submit:
 
 ```bash
 $ heron submit local ~/.heron/examples/heron-examples.jar com.twitter.heron.examples.ExclamationTopology ExclamationTopology
@@ -94,7 +98,7 @@
 
 To invoke the help for submitting a topology:
 ```bash
-$ heron help submit 
+$ heron help submit
 usage: heron submit [options] cluster/[role]/[environ] topology-file-name topology-class-name [topology-args]
 
 Required arguments:
@@ -103,9 +107,11 @@
   topology-class-name   Topology class name
 
 Optional arguments:
-  --config-path (a string; path to cluster config; default: "/Users/USERNAME/.heron/conf/<cluster>")
-  --config-property (a string; a config property; default: [])
+  --config-path (a string; path to cluster config; default: "/Users/$USER/.heron/conf")
+  --config-property (key=value; a config key and its value; default: [])
   --deploy-deactivated (a boolean; default: "false")
+  -D DEFINE             Define a system property to pass to java -D when
+                        running main.
   --verbose (a boolean; default: "false")
 ```
 
diff --git a/website/content/docs/operators/deployment/index.md b/website/content/docs/operators/deployment/index.md
index ac5e7a8..8bddd72 100644
--- a/website/content/docs/operators/deployment/index.md
+++ b/website/content/docs/operators/deployment/index.md
@@ -3,9 +3,9 @@
 Heron is designed to be run in clustered, scheduler-driven environments. It
 currently supports three scheduler options out of the box:
 
-* [Aurora](aurora)
-* [Mesos](mesos)
-* [Local scheduler](local)
+* [Aurora](schedulers/aurora)
+* [Mesos](schedulers/mesos)
+* [Local scheduler](schedulers/local)
 
 To implement a new scheduler, see
 [Implementing a Custom Scheduler](../../contributors/custom-scheduler).
diff --git a/website/content/docs/operators/deployment/schedulers/aurora.md b/website/content/docs/operators/deployment/schedulers/aurora.md
index aff18d2..15b0577 100644
--- a/website/content/docs/operators/deployment/schedulers/aurora.md
+++ b/website/content/docs/operators/deployment/schedulers/aurora.md
@@ -10,14 +10,14 @@
 
 Aurora doesn't have a Heron scheduler *per se*. Instead, when a topology is
 submitted to Heron, `heron-cli` interacts with Aurora to automatically stand up
-all the [components](../../../concepts/architecture) necessary to [manage
-topologies](../../heron-cli).
+all the [components](../../../../concepts/architecture) necessary to [manage
+topologies](../../../heron-cli).
 
 ## ZooKeeper
 
 To run Heron on Aurora, you'll need to set up a ZooKeeper cluster and configure
 Heron to communicate with it. Instructions can be found in [Setting up
-ZooKeeper](../zookeeper).
+ZooKeeper](../../statemanagers/zookeeper).
 
 ## Hosting Binaries
 
@@ -26,7 +26,7 @@
 it's accessible to Aurora (for example in [Amazon
 S3](https://aws.amazon.com/s3/) or using a local blob storage solution). You can
 build those binaries using the instructions in [Creating a New Heron
-Release](../../../developers/compiling#building-a-full-release-package).
+Release](../../../../developers/compiling#building-a-full-release-package).
 
 Once your Heron binaries are hosted somewhere that is accessible to Aurora, you
 should run tests to ensure that Aurora can successfully fetch them.
@@ -43,14 +43,14 @@
 (i.e. can submit topologies, activate and deactivate them, etc.).
 
 The most important thing at this stage is to ensure that `heron-cli` is synced
-across all machines that will be [working with topologies](../../heron-cli).
+across all machines that will be [working with topologies](../../../heron-cli).
 Once that has been ensured, you can use Aurora as a scheduler by specifying the
 proper configuration and configuration loader when managing topologies.
 
 ### Specifying a Configuration
 
 You'll need to specify a scheduler configuration at all stages of a topology's
-[lifecycle](../../../concepts/topologies#topology-lifecycle) by using the
+[lifecycle](../../../../concepts/topologies#topology-lifecycle) by using the
 `--config-file` flag to point at a configuration file. There is a default Aurora
 configuration located in the Heron repository at
 `heron/cli/src/python/aurora_scheduler.conf`. You can use this file as is,
diff --git a/website/content/docs/operators/deployment/schedulers/local.md b/website/content/docs/operators/deployment/schedulers/local.md
index c4ac493..a0349d1 100644
--- a/website/content/docs/operators/deployment/schedulers/local.md
+++ b/website/content/docs/operators/deployment/schedulers/local.md
@@ -12,7 +12,7 @@
 2. [The local filesystem](#local-filesystem)
 
 **Note**: Deploying a Heron cluster locally is not to be confused with Heron's
-[local mode](../developers/java/local-mode.html). Local mode enables you to run
+[local mode](../../developers/java/local-mode.html). Local mode enables you to run
 topologies in a cluster-agnostic JVM process for the purpose of development and
 debugging, while the local scheduler stands up a Heron cluster on a single
 machine.
@@ -20,10 +20,10 @@
 ## How Local Deployment Works
 
 Using the local scheduler is similar to deploying Heron on other systems in
-that you use the [Heron CLI](../../heron-cli) to manage topologies. The
+that you use the [Heron CLI](../../../heron-cli) to manage topologies. The
 difference is in the configuration and [scheduler
-overrides](../../heron-cli#submitting-a-topology) that you provide when
-you [submit a topology](../../heron-cli#submitting-a-topology).
+overrides](../../../heron-cli#submitting-a-topology) that you provide when
+you [submit a topology](../../../heron-cli#submitting-a-topology).
 
 ### Required Scheduler Overrides
 
@@ -37,13 +37,13 @@
   coordination.
 
 For info on scheduler overrides, see the documentation on using the [Heron
-CLI](../../heron-cli).
+CLI](../../../heron-cli).
 
 ### Optional Scheduler Overrides
 
 The `heron.core.release.package` parameter is optional. It specifies the path to
 a local TAR file for the `core` component of the desired Heron release. Assuming
-that you've built a full [Heron release](../../../developers/compiling#building-a-full-release-package), this TAR will be
+that you've built a full [Heron release](../../../../developers/compiling#building-a-full-release-package), this TAR will be
 located by default at `bazel-genfiles/release/heron-core-unversioned.tar`,
 relative to the root of your Heron repository. If you set
 `heron.core.release.package`, Heron will update all local binaries in Heron's
@@ -53,7 +53,7 @@
 ### CLI Flags
 
 In addition to setting scheduler overrides, you'll need to set the following
-[CLI flags](../../heron-cli):
+[CLI flags](../../../heron-cli):
 
 * `--config-file` &mdash; This flag needs to point to the `local_scheduler.conf`
   file in `heron/cli/src/python/local_scheduler.conf`.
diff --git a/website/content/docs/operators/deployment/schedulers/mesos.md b/website/content/docs/operators/deployment/schedulers/mesos.md
index 5c4c591..d89151b 100644
--- a/website/content/docs/operators/deployment/schedulers/mesos.md
+++ b/website/content/docs/operators/deployment/schedulers/mesos.md
@@ -9,14 +9,14 @@
 ## How Heron on Mesos Works
 
 Heron's Mesos scheduler interacts with Mesos to stand up all of the
-[components](../../../concepts/architecture) necessary to [manage
-topologies](../../heron-cli).
+[components](../../../../concepts/architecture) necessary to [manage
+topologies](../../../heron-cli).
 
 ## ZooKeeper
 
 To run Heron on Mesos, you'll need to set up a ZooKeeper cluster and configure
 Heron to communicate with it. Instructions can be found in [Setting up
-ZooKeeper](../zookeeper).
+ZooKeeper](../../statemanagers/zookeeper).
 
 ## Hosting Binaries
 
@@ -24,7 +24,7 @@
 variety of Heron binaries, which can be hosted wherever you'd like, so long as
 it's accessible to Mesos (for example in [Amazon S3](https://aws.amazon.com/s3/)
 or using a local blog storage solution). You can build those binaries using the
-instructions in [Creating a New Heron Release](../../../developers/compiling#building-a-full-release-package).
+instructions in [Creating a New Heron Release](../../../../developers/compiling#building-a-full-release-package).
 
 Once your Heron binaries are hosted somewhere that's accessible to Mesos, you
 should run tests to ensure that Mesos can successfully fetch them.
@@ -52,14 +52,14 @@
 topologies (i.e. can submit topologies, activate and deactivate them, etc.).
 
 The most important thing at this stage is to ensure that `heron-cli` is synced
-across all machines that will be [working with topologies](../../heron-cli).
+across all machines that will be [working with topologies](../../../heron-cli).
 Once that has been ensured, you can use Mesos as a scheduler by specifying the
 proper configuration and configuration loader when managing topologies.
 
 ### Specifying a Configuration
 
 You'll need to specify a scheduler configuration at all stages of a topology's
-[lifecycle](../../../concepts/topologies#topology-lifecycle) by using the
+[lifecycle](../../../../concepts/topologies#topology-lifecycle) by using the
 `--config-file` flag to point at a configuration file. There is a default Mesos
 configuration located in the Heron repository at
 `heron/cli/src/python/mesos_scheduler.conf`. You can use this file as is,
diff --git a/website/content/docs/operators/heron-tracker.md b/website/content/docs/operators/heron-tracker.md
index 9f42b20..44d1f24 100644
--- a/website/content/docs/operators/heron-tracker.md
+++ b/website/content/docs/operators/heron-tracker.md
@@ -8,7 +8,7 @@
 be found [here](../../concepts/architecture#heron-tracker).
 
 The Tracker can run within your Heron cluster (e.g.
-[Mesos](../../operators/deployment/mesos) or [Aurora](../../operators/deployment/aurora)) or
+[Mesos](../../operators/deployment/schedulers/mesos) or [Aurora](../../operators/deployment/schedulers/aurora)) or
 outside of it, provided that the machine on which it runs has access to your
 Heron cluster.
 
diff --git a/website/data/toc.yaml b/website/data/toc.yaml
index 711c1fa..965e6ef 100644
--- a/website/data/toc.yaml
+++ b/website/data/toc.yaml
@@ -11,16 +11,12 @@
         url: /docs/operators/deployment
   - name: State Managers
     sublinks:
-      - name: Overview
-        url: /docs/operators/deployment/statemanagers
       - name: Setup Zookeeper
         url: /docs/operators/deployment/statemanagers/zookeeper
       - name: Setup Local FS
         url: /docs/operators/deployment/statemanagers/localfs
   - name: Uploaders
     sublinks:
-      - name: Overview
-        url: /docs/operators/deployment/uploaders
       - name: Setup Local FS
         url: /docs/operators/deployment/uploaders/localfs
       - name: Setup HDFS
diff --git a/website/public b/website/public
new file mode 160000
index 0000000..00df5d6
--- /dev/null
+++ b/website/public
@@ -0,0 +1 @@
+Subproject commit 00df5d61078c90da1eb41ffc5a792fd6cc936919