Merge branch 'karthik/upgradepex' of https://github.com/twitter/heron into karthik/upgradepex
diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java b/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java
index a25722a..182a965 100644
--- a/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java
+++ b/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java
@@ -83,6 +83,9 @@
   }
 
   private BaseWindowedBolt withWindowLength(Count count) {
+    if (count == null) {
+      throw new IllegalArgumentException("Window length cannot be set null");
+    }
     if (count.value <= 0) {
       throw new IllegalArgumentException("Window length must be positive [" + count + "]");
     }
@@ -91,7 +94,10 @@
   }
 
   private BaseWindowedBolt withWindowLength(Duration duration) {
-    if (duration.isNegative()) {
+    if (duration == null) {
+      throw new IllegalArgumentException("Window length cannot be set null");
+    }
+    if (duration.isNegative() || duration.isZero()) {
       throw new IllegalArgumentException("Window length must be positive [" + duration + "]");
     }
 
@@ -101,6 +107,9 @@
   }
 
   private BaseWindowedBolt withSlidingInterval(Count count) {
+    if (count == null) {
+      throw new IllegalArgumentException("Sliding interval cannot be set null");
+    }
     if (count.value <= 0) {
       throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]");
     }
@@ -109,7 +118,10 @@
   }
 
   private BaseWindowedBolt withSlidingInterval(Duration duration) {
-    if (duration.isNegative()) {
+    if (duration == null) {
+      throw new IllegalArgumentException("Sliding interval cannot be set null");
+    }
+    if (duration.isNegative() || duration.isZero()) {
       throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]");
     }
     windowConfiguration.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS,
@@ -194,8 +206,9 @@
   }
 
   /**
-   * Specify a field in the tuple that represents the timestamp as a long value. If this
-   * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+   * Specify a field in the tuple that represents the timestamp as a long value. The timestamp
+   * should also be in milliseconds. If this field is not present in the
+   * incoming tuple, an {@link IllegalArgumentException} will be thrown.
    *
    * @param fieldName the name of the field that contains the timestamp
    */
@@ -210,6 +223,9 @@
    */
   @SuppressWarnings("HiddenField")
   public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+    if (timestampExtractor == null) {
+      throw new IllegalArgumentException("Timestamp extractor cannot be set to null");
+    }
     if (this.timestampExtractor != null) {
       throw new IllegalArgumentException(
           "Window is already configured with a timestamp " + "extractor: " + timestampExtractor);
@@ -234,6 +250,9 @@
    * @param streamId the name of the stream used to emit late tuples on
    */
   public BaseWindowedBolt withLateTupleStream(String streamId) {
+    if (streamId == null) {
+      throw new IllegalArgumentException("Cannot set late tuple stream id to null");
+    }
     windowConfiguration.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, streamId);
     return this;
   }
@@ -247,6 +266,12 @@
    * @param duration the max lag duration
    */
   public BaseWindowedBolt withLag(Duration duration) {
+    if (duration == null) {
+      throw new IllegalArgumentException("Lag duration cannot be set null");
+    }
+    if (duration.isNegative() || duration.isZero()) {
+      throw new IllegalArgumentException("Lag duration must be positive [" + duration + "]");
+    }
     windowConfiguration.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS,
         duration.toMillis());
     return this;
@@ -259,6 +284,12 @@
    * @param interval the interval at which watermark events are generated
    */
   public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+    if (interval == null) {
+      throw new IllegalArgumentException("Watermark interval cannot be set null");
+    }
+    if (interval.isNegative() || interval.isZero()) {
+      throw new IllegalArgumentException("Watermark interval must be positive [" + interval + "]");
+    }
     windowConfiguration.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS,
         interval.toMillis());
     return this;
diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
index 2dd1e15..d027d89 100644
--- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
+++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
@@ -71,7 +71,7 @@
   private transient EvictionPolicy<Tuple> evictionPolicy;
   private transient Long windowLengthDurationMs;
   // package level for unit tests
-  private transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
+  protected transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
 
   public WindowedBoltExecutor(IWindowedBolt bolt) {
     this.bolt = bolt;
@@ -150,7 +150,6 @@
                                                      lifecycleListener, Map<String, Object>
       topoConf, TopologyContext context, Collection<Event<Tuple>> queue) {
 
-
     WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener, queue);
 
     Count windowLengthCount = null;
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java
index fd71ba5..d60de33 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java
@@ -14,6 +14,7 @@
 package com.twitter.heron.api.windowing;
 
 import java.util.HashMap;
+import java.util.Map;
 
 public class WindowingConfigs extends HashMap<String, Object> {
 
@@ -69,4 +70,62 @@
      */
   public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS = "topology.bolts"
       + ".watermark.event.interval.ms";
+
+  public void setTopologyBoltsWindowLengthCount(long value) {
+    setTopologyBoltsWindowLengthCount(this, value);
+  }
+
+  public static void setTopologyBoltsWindowLengthCount(Map<String, Object> conf, long value) {
+    conf.put(TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, value);
+  }
+
+  public void setTopologyBoltsWindowLengthDurationMs(long value) {
+    setTopologyBoltsWindowLengthDurationMs(this, value);
+  }
+
+  public static void setTopologyBoltsWindowLengthDurationMs(Map<String, Object> conf, long value) {
+    conf.put(TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, value);
+  }
+
+  public void setTopologyBoltsSlidingIntervalCount(long value) {
+    setTopologyBoltsSlidingIntervalCount(this, value);
+  }
+
+  public static void setTopologyBoltsSlidingIntervalCount(Map<String, Object> conf, long value) {
+    conf.put(TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, value);
+  }
+
+  public void setTopologyBoltsSlidingIntervalDurationMs(long value) {
+    setTopologyBoltsSlidingIntervalDurationMs(this, value);
+  }
+
+  public static void setTopologyBoltsSlidingIntervalDurationMs(
+      Map<String, Object> conf, long value) {
+    conf.put(TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, value);
+  }
+
+  public void setTopologyBoltsLateTupleStream(String value) {
+    setTopologyBoltsLateTupleStream(this, value);
+  }
+
+  public static void setTopologyBoltsLateTupleStream(Map<String, Object> conf, String value) {
+    conf.put(TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, value);
+  }
+
+  public void setTopologyBoltsTupleTimestampMaxLagMs(long value) {
+    setTopologyBoltsTupleTimestampMaxLagMs(this, value);
+  }
+
+  public static void setTopologyBoltsTupleTimestampMaxLagMs(Map<String, Object> conf, long value) {
+    conf.put(TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, value);
+  }
+
+  public void setTopologyBoltsWatermarkEventIntervalMs(long value) {
+    setTopologyBoltsWatermarkEventIntervalMs(this, value);
+  }
+
+  public static void setTopologyBoltsWatermarkEventIntervalMs(
+      Map<String, Object> conf, long value) {
+    conf.put(TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, value);
+  }
 }
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index 9919e11..fcff382 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -3,7 +3,9 @@
 
 api_deps_files = [
     "//heron/api/src/java:api-java",
+    "//heron/common/src/java:utils-java",
     "//third_party/java:junit4",
+    "@org_mockito_mockito_all//jar"
 ]
 
 api_deps_files = \
@@ -19,7 +21,9 @@
 java_tests(
   test_classes = [
     "com.twitter.heron.api.windowing.WindowManagerTest",
-    "com.twitter.heron.api.windowing.WaterMarkEventGeneratorTest"
+    "com.twitter.heron.api.windowing.WaterMarkEventGeneratorTest",
+    "com.twitter.heron.api.bolt.WindowedBoltExecutorTest",
+    "com.twitter.heron.api.bolt.BaseWindowedBoltTest"
   ],
   runtime_deps = [ ":api-tests" ],
   size = "small",
diff --git a/heron/api/tests/java/com/twitter/heron/api/bolt/BaseWindowedBoltTest.java b/heron/api/tests/java/com/twitter/heron/api/bolt/BaseWindowedBoltTest.java
new file mode 100644
index 0000000..222e7d0
--- /dev/null
+++ b/heron/api/tests/java/com/twitter/heron/api/bolt/BaseWindowedBoltTest.java
@@ -0,0 +1,276 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.api.bolt;
+
+import java.time.Duration;
+
+import org.junit.Test;
+
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.windowing.TupleWindow;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for {@link BaseWindowedBolt}
+ */
+public class BaseWindowedBoltTest {
+
+  public static class TestBolt extends BaseWindowedBolt {
+
+    private static final long serialVersionUID = -7224073487836212922L;
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+
+    }
+  }
+
+  @Test
+  public void testSettingSlidingCountWindow() {
+    final Object[][] args = new Object[][]{
+        {-1, 10},
+        {10, -1},
+        {0, 10},
+        {10, 0},
+        {0, 0},
+        {-1, -1},
+        {5, 10},
+        {1, 1},
+        {10, 5},
+        {100, 10},
+        {100, 100},
+        {200, 100},
+        {500, 100},
+        {null, null},
+        {null, 1},
+        {1, null},
+        {null, -1},
+        {-1, null}
+    };
+
+    for (Object[] arg : args) {
+      TopologyBuilder builder = new TopologyBuilder();
+      Object arg0 = arg[0];
+      Object arg1 = arg[1];
+      try {
+
+        BaseWindowedBolt.Count windowLengthCount = null;
+        if (arg0 != null) {
+          windowLengthCount = BaseWindowedBolt.Count.of((Integer) arg0);
+        }
+        BaseWindowedBolt.Count slidingIntervalCount = null;
+
+        if (arg1 != null) {
+          slidingIntervalCount = BaseWindowedBolt.Count.of((Integer) arg1);
+        }
+
+        builder.setBolt("testBolt", new TestBolt().withWindow(windowLengthCount,
+            slidingIntervalCount));
+        if (arg0 == null || arg1 == null) {
+          fail(String.format("Window length or sliding window length cannot be null -- "
+              + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+        }
+        if ((Integer) arg0 <= 0 || (Integer) arg1 <= 0) {
+          fail(String.format("Window length or sliding window length cannot be zero or less -- "
+              + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+        }
+      } catch (IllegalArgumentException e) {
+        if (arg0 != null && arg1 != null && (Integer) arg0 > 0 && (Integer) arg1 > 0) {
+          fail(String.format("Exception: %s thrown on valid input -- windowLengthCount: %s "
+              + "slidingIntervalCount: %s", e.getMessage(), arg0, arg1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSettingSlidingTimeWindow() {
+    final Object[][] args = new Object[][]{
+        {-1L, 10L},
+        {10L, -1L},
+        {0L, 10L},
+        {10L, 0L},
+        {0L, 0L},
+        {-1L, -1L},
+        {5L, 10L},
+        {1L, 1L},
+        {10L, 5L},
+        {100L, 10L},
+        {100L, 100L},
+        {200L, 100L},
+        {500L, 100L},
+        {null, null},
+        {null, 1L},
+        {1L, null},
+        {null, -1L},
+        {-1L, null}
+    };
+
+    for (Object[] arg : args) {
+      TopologyBuilder builder = new TopologyBuilder();
+      Object arg0 = arg[0];
+      Object arg1 = arg[1];
+      try {
+
+        Duration windowLengthDuration = null;
+        if (arg0 != null) {
+          windowLengthDuration = Duration.ofMillis((Long) arg0);
+        }
+        Duration slidingIntervalDuration = null;
+
+        if (arg1 != null) {
+          slidingIntervalDuration = Duration.ofMillis((Long) arg1);
+        }
+
+        builder.setBolt("testBolt", new TestBolt().withWindow(windowLengthDuration,
+            slidingIntervalDuration));
+        if (arg0 == null || arg1 == null) {
+          fail(String.format("Window length or sliding window length cannot be null -- "
+              + "windowLengthDuration: %s slidingIntervalDuration: %s", arg0, arg1));
+        }
+        if ((Long) arg0 <= 0 || (Long) arg1 <= 0) {
+          fail(String.format("Window length or sliding window length cannot be zero or less -- "
+              + "windowLengthDuration: %s slidingIntervalDuration: %s", arg0, arg1));
+        }
+      } catch (IllegalArgumentException e) {
+        if (arg0 != null && arg1 != null && (Long) arg0 > 0 && (Long) arg1 > 0) {
+          fail(String.format("Exception: %s thrown on valid input -- windowLengthDuration: %s "
+              + "slidingIntervalDuration: %s", e.getMessage(), arg0, arg1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSettingTumblingCountWindow() {
+    final Object[] args = new Object[] {-1, 0, 1, 2, 5, 10, null};
+
+    for (Object arg : args) {
+      TopologyBuilder builder = new TopologyBuilder();
+      Object arg0 = arg;
+      try {
+
+        BaseWindowedBolt.Count windowLengthCount = null;
+        if (arg0 != null) {
+          windowLengthCount = BaseWindowedBolt.Count.of((Integer) arg0);
+        }
+
+        builder.setBolt("testBolt", new TestBolt().withTumblingWindow(windowLengthCount));
+        if (arg0 == null) {
+          fail(String.format("Window length cannot be null -- windowLengthCount: %s", arg0));
+        }
+        if ((Integer) arg0 <= 0) {
+          fail(String.format("Window length cannot be zero or less -- windowLengthCount: %s",
+              arg0));
+        }
+      } catch (IllegalArgumentException e) {
+        if (arg0 != null && (Integer) arg0 > 0) {
+          fail(String.format("Exception: %s thrown on valid input -- windowLengthCount: %s", e
+              .getMessage(), arg0));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSettingTumblingTimeWindow() {
+    final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+    for (Object arg : args) {
+      TopologyBuilder builder = new TopologyBuilder();
+      Object arg0 = arg;
+      try {
+
+        Duration windowLengthDuration = null;
+        if (arg0 != null) {
+          windowLengthDuration = Duration.ofMillis((Long) arg0);
+        }
+
+        builder.setBolt("testBolt", new TestBolt().withTumblingWindow(windowLengthDuration));
+        if (arg0 == null) {
+          fail(String.format("Window count duration cannot be null -- windowLengthDuration: %s",
+              arg0));
+        }
+        if ((Long) arg0 <= 0) {
+          fail(String.format("Window length cannot be zero or less -- windowLengthDuration: %s",
+              arg0));
+        }
+      } catch (IllegalArgumentException e) {
+        if (arg0 != null && (Long) arg0 > 0) {
+          fail(String.format("Exception: %s thrown on valid input -- windowLengthDuration: %s", e
+              .getMessage(), arg0));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSettingLagTime() {
+    final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+    for (Object arg : args) {
+      TopologyBuilder builder = new TopologyBuilder();
+      Object arg0 = arg;
+      try {
+
+        Duration lagTime = null;
+        if (arg0 != null) {
+          lagTime = Duration.ofMillis((Long) arg0);
+        }
+
+        builder.setBolt("testBolt", new TestBolt().withLag(lagTime));
+        if (arg0 == null) {
+          fail(String.format("Window lag duration cannot be null -- lagTime: %s", arg0));
+        }
+        if ((Long) arg0 <= 0) {
+          fail(String.format("Window lag cannot be zero or less -- lagTime: %s", arg0));
+        }
+      } catch (IllegalArgumentException e) {
+        if (arg0 != null && (Long) arg0 > 0) {
+          fail(String.format("Exception: %s thrown on valid input -- lagTime: %s",
+              e.getMessage(), arg0));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSettingWaterMarkInterval() {
+    final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+    for (Object arg : args) {
+      TopologyBuilder builder = new TopologyBuilder();
+      Object arg0 = arg;
+      try {
+
+        Duration watermarkInterval = null;
+        if (arg0 != null) {
+          watermarkInterval = Duration.ofMillis((Long) arg0);
+        }
+
+        builder.setBolt("testBolt", new TestBolt().withWatermarkInterval(watermarkInterval));
+        if (arg0 == null) {
+          fail(String.format("Watermark interval cannot be null -- watermarkInterval: %s", arg0));
+        }
+        if ((Long) arg0 <= 0) {
+          fail(String.format("Watermark interval cannot be zero or less -- watermarkInterval: "
+              + "%s", arg0));
+        }
+      } catch (IllegalArgumentException e) {
+        if (arg0 != null && (Long) arg0 > 0) {
+          fail(String.format("Exception: %s thrown on valid input -- watermarkInterval: %s", e
+              .getMessage(), arg0));
+        }
+      }
+    }
+  }
+}
diff --git a/heron/api/tests/java/com/twitter/heron/api/bolt/WindowedBoltExecutorTest.java b/heron/api/tests/java/com/twitter/heron/api/bolt/WindowedBoltExecutorTest.java
new file mode 100644
index 0000000..52a21d3
--- /dev/null
+++ b/heron/api/tests/java/com/twitter/heron/api/bolt/WindowedBoltExecutorTest.java
@@ -0,0 +1,241 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.api.bolt;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.windowing.TupleWindow;
+import com.twitter.heron.api.windowing.WindowingConfigs;
+import com.twitter.heron.common.utils.topology.TopologyContextImpl;
+import com.twitter.heron.common.utils.tuple.TupleImpl;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for {@link WindowedBoltExecutor}
+ */
+public class WindowedBoltExecutorTest {
+
+  private WindowedBoltExecutor executor;
+  private TestWindowedBolt testWindowedBolt;
+
+  @SuppressWarnings("VisibilityModifier")
+  private static class TestWindowedBolt extends BaseWindowedBolt {
+    private static final long serialVersionUID = -8934326157586387333L;
+    List<TupleWindow> tupleWindows = new ArrayList<>();
+
+    @Override
+    public void execute(TupleWindow input) {
+      //System.out.println(input);
+      tupleWindows.add(input);
+    }
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private TopologyContext getContext(final Fields fields) {
+    TopologyBuilder builder = new TopologyBuilder();
+    return new TopologyContextImpl(new Config(),
+        builder.createTopology()
+            .setConfig(new Config())
+            .setName("test")
+            .setState(TopologyAPI.TopologyState.RUNNING)
+            .getTopology(),
+        new HashMap(), 1, null) {
+      @Override
+      public Fields getComponentOutputFields(String componentId, String streamId) {
+        return fields;
+      }
+
+    };
+  }
+
+  private Tuple getTuple(String streamId, final Fields fields, Values values) {
+
+    TopologyContext topologyContext = getContext(fields);
+    return new TupleImpl(topologyContext, TopologyAPI.StreamId.newBuilder()
+        .setId(streamId).setComponentName("s1")
+        .build(), 0, null, values, 1) {
+      @Override
+      public TopologyAPI.StreamId getSourceGlobalStreamId() {
+        return TopologyAPI.StreamId.newBuilder().setComponentName("s1").setId("default").build();
+      }
+    };
+  }
+
+  private OutputCollector getOutputCollector() {
+    return Mockito.mock(OutputCollector.class);
+  }
+
+  private TopologyContext getTopologyContext() {
+    TopologyContext context = Mockito.mock(TopologyContext.class);
+
+    Map<TopologyAPI.StreamId, TopologyAPI.Grouping> sources =
+        Collections.singletonMap(TopologyAPI.StreamId.newBuilder()
+            .setComponentName("s1").setId("default").build(), null);
+    Mockito.when(context.getThisSources()).thenReturn(sources);
+    return context;
+  }
+
+  @Before
+  public void setUp() {
+    testWindowedBolt = new TestWindowedBolt();
+    testWindowedBolt.withTimestampField("ts");
+    executor = new WindowedBoltExecutor(testWindowedBolt);
+    Map<String, Object> conf = new HashMap<>();
+    conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
+    // trigger manually to avoid timing issues
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100000L);
+    executor.prepare(conf, getTopologyContext(), getOutputCollector());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExecuteWithoutTs() throws Exception {
+    executor.execute(getTuple("s1", new Fields("a"), new Values(1)));
+  }
+
+  @Test
+  public void testExecuteWithTs() throws Exception {
+    long[] timestamps = {603, 605, 607, 618, 626, 636};
+    for (long ts : timestamps) {
+      executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
+    }
+    //Thread.sleep(120);
+    executor.waterMarkEventGenerator.run();
+    //System.out.println(testWindowedBolt.tupleWindows);
+    assertEquals(3, testWindowedBolt.tupleWindows.size());
+    TupleWindow first = testWindowedBolt.tupleWindows.get(0);
+    assertArrayEquals(
+        new long[]{603, 605, 607},
+        new long[]{(long) first.get().get(0).getValue(0),
+            (long) first.get().get(1).getValue(0),
+            (long) first.get().get(2).getValue(0)});
+
+    TupleWindow second = testWindowedBolt.tupleWindows.get(1);
+    assertArrayEquals(
+        new long[]{603, 605, 607, 618},
+        new long[]{(long) second.get().get(0).getValue(0),
+            (long) second.get().get(1).getValue(0),
+            (long) second.get().get(2).getValue(0),
+            (long) second.get().get(3).getValue(0)});
+
+    TupleWindow third = testWindowedBolt.tupleWindows.get(2);
+    assertArrayEquals(new long[]{618, 626}, new long[]{(long) third.get().get(0).getValue(0),
+        (long) third.get().get(1).getValue(0)});
+  }
+
+  @Test
+  public void testPrepareLateTupleStreamWithoutTs() throws Exception {
+    Map<String, Object> conf = new HashMap<>();
+    conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10L);
+
+    testWindowedBolt = new TestWindowedBolt();
+    executor = new WindowedBoltExecutor(testWindowedBolt);
+    TopologyContext context = getTopologyContext();
+    // emulate the call of withLateTupleStream method
+    Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default",
+        "$late")));
+    try {
+      executor.prepare(conf, context, getOutputCollector());
+      fail();
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(), "Late tuple stream can be defined only when specifying a "
+          + "timestamp field");
+    }
+  }
+
+  @Test
+  public void testPrepareLateTupleStreamWithoutBuilder() throws Exception {
+    Map<String, Object> conf = new HashMap<>();
+    conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10L);
+
+    testWindowedBolt = new TestWindowedBolt();
+    testWindowedBolt.withTimestampField("ts");
+    executor = new WindowedBoltExecutor(testWindowedBolt);
+    TopologyContext context = getTopologyContext();
+    try {
+      executor.prepare(conf, context, getOutputCollector());
+      fail();
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(), "Stream for late tuples must be defined with the builder "
+          + "method withLateTupleStream");
+    }
+  }
+
+
+  @Test
+  public void testExecuteWithLateTupleStream() throws Exception {
+    testWindowedBolt = new TestWindowedBolt();
+    testWindowedBolt.withTimestampField("ts");
+    executor = new WindowedBoltExecutor(testWindowedBolt);
+    TopologyContext context = getTopologyContext();
+    Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default",
+        "$late")));
+
+    OutputCollector outputCollector = Mockito.mock(OutputCollector.class);
+    Map<String, Object> conf = new HashMap<>();
+    conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
+    //Trigger manually to avoid timing issues
+    conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 1_000_000L);
+    executor.prepare(conf, context, outputCollector);
+
+    long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
+    List<Tuple> tuples = new ArrayList<>(timestamps.length);
+
+    for (long ts : timestamps) {
+      Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
+      tuples.add(tuple);
+      executor.execute(tuple);
+
+      //Update the watermark to this timestamp
+      executor.waterMarkEventGenerator.run();
+    }
+    System.out.println(testWindowedBolt.tupleWindows);
+    Tuple tuple = tuples.get(tuples.size() - 1);
+    Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
+  }
+}
diff --git a/heron/common/src/cpp/zookeeper/zkclient.cpp b/heron/common/src/cpp/zookeeper/zkclient.cpp
index 0fbfae6..6633520 100644
--- a/heron/common/src/cpp/zookeeper/zkclient.cpp
+++ b/heron/common/src/cpp/zookeeper/zkclient.cpp
@@ -136,32 +136,14 @@
 }
 
 void ZKClient::Init() {
-  zkaction_responses_ = new PCQueue<CallBack*>();
-  auto zkaction_response_cb = [this](EventLoop::Status status) {
-    this->OnZkActionResponse(status);
-  };
-
-  if (pipe(pipers_) < 0) {
-    LOG(FATAL) << "Pipe failed in ZKClient";
-  }
-  sp_int32 flags;
-  if ((flags = fcntl(pipers_[0], F_GETFL, 0)) < 0 ||
-      fcntl(pipers_[0], F_SETFL, flags | O_NONBLOCK) < 0 ||
-      eventLoop_->registerForRead(pipers_[0], std::move(zkaction_response_cb), true) != 0) {
-    LOG(FATAL) << "fcntl failed in ZKClient";
-  }
+  piper_ = new Piper(eventLoop_);
   zoo_deterministic_conn_order(0);  // even distribution of clients on the server
   InitZKHandle();
 }
 
 // Destructor.
 ZKClient::~ZKClient() {
-  if (eventLoop_) {
-    CHECK_EQ(eventLoop_->unRegisterForRead(pipers_[0]), 0);
-  }
-  close(pipers_[0]);
-  close(pipers_[1]);
-  delete zkaction_responses_;
+  delete piper_;
   zookeeper_close(zk_handle_);
 }
 
@@ -350,55 +332,17 @@
   }
 }
 
-void ZKClient::SignalMainThread() {
-  // This need not be protected by any mutex.
-  // The os will take care of that.
-  int rc = write(pipers_[1], "a", 1);
-  if (rc != 1) {
-    LOG(FATAL) << "Write to pipe failed in ZkClient with return code:  " << rc;
-  }
-}
-
-void ZKClient::OnZkActionResponse(EventLoop::Status _status) {
-  if (_status == EventLoop::READ_EVENT) {
-    char buf[1];
-    ssize_t readcount = read(pipers_[0], buf, 1);
-    if (readcount == 1) {
-      bool dequeued = false;
-      CallBack* cb = zkaction_responses_->trydequeue(dequeued);
-      if (cb) {
-        cb->Run();
-      }
-    } else {
-      LOG(ERROR) << "In Server read from pipers returned " << readcount << " errno " << errno
-                 << std::endl;
-      if (readcount < 0 && (errno == EAGAIN || errno == EINTR)) {
-        // Never mind. we will try again
-        return;
-      } else {
-        // We really don't know what to do here.
-        // TODO(kramasamy): Figure out a way to get the hell out of here
-        return;
-      }
-    }
-  }
-  return;
-}
-
 void ZKClient::ZkActionCb(sp_int32 rc, VCallback<sp_int32> cb) {
-  zkaction_responses_->enqueue(CreateCallback(&RunUserCb, rc, std::move(cb)));
-  SignalMainThread();
+  piper_->ExecuteInEventLoop(std::bind(&RunUserCb, rc, std::move(cb)));
 }
 
 void ZKClient::ZkWatcherCb(VCallback<> cb) {
-  zkaction_responses_->enqueue(CreateCallback(&RunWatcherCb, std::move(cb)));
-  SignalMainThread();
+  piper_->ExecuteInEventLoop(std::bind(&RunWatcherCb, std::move(cb)));
 }
 
 void ZKClient::SendWatchEvent(const ZkWatchEvent& event) {
   CHECK(client_global_watcher_cb_);
-  zkaction_responses_->enqueue(CreateCallback(&RunWatchEventCb, client_global_watcher_cb_, event));
-  SignalMainThread();
+  piper_->ExecuteInEventLoop(std::bind(&RunWatchEventCb, client_global_watcher_cb_, event));
 }
 
 const std::string ZKClient::state2String(sp_int32 _state) {
diff --git a/heron/common/src/cpp/zookeeper/zkclient.h b/heron/common/src/cpp/zookeeper/zkclient.h
index f076294..2ae6803 100644
--- a/heron/common/src/cpp/zookeeper/zkclient.h
+++ b/heron/common/src/cpp/zookeeper/zkclient.h
@@ -118,7 +118,6 @@
   ZKClient()
       : zk_handle_(NULL),
         eventLoop_(NULL),
-        zkaction_responses_(NULL),
         client_global_watcher_cb_(VCallback<ZkWatchEvent>()) {}
 
  private:
@@ -128,17 +127,10 @@
   // The function that actually inits the handle
   void InitZKHandle();
 
-  // This is the function used to signal the main thread
-  void SignalMainThread();
-
-  // When the zk callback wants to wake the main thread, it uses the SignalMainThread function.
-  // This function will get executed in the main thread.
-  void OnZkActionResponse(EventLoop::Status _status);
-
   // We wrap all user zk calls with this completion function
   // This completion function runs in the context of the
-  // zk completion thread. It basically appends to
-  // zkaction_responses_ and calls SignalMainThread
+  // zk completion thread. It basically calls the piper
+  // to execute the cb in eventLoop thread
   void ZkActionCb(sp_int32 rc, VCallback<sp_int32> cb);
 
   // This is the watcher function that gets called
@@ -157,9 +149,8 @@
 
   // We use libzookeeper_mt as our zk library. This means that
   // zk callbacks are all executed in the context of a zk thread.
-  // These pipers are how they communicate it accross to our thread
-  sp_int32 pipers_[2];
-  PCQueue<CallBack*>* zkaction_responses_;
+  // Piper are how they communicate it accross to our main thread
+  Piper* piper_;
   // A callback to notify the clients of this class about global session events.
   VCallback<ZkWatchEvent> client_global_watcher_cb_;
 };
diff --git a/heron/common/src/java/com/twitter/heron/common/basics/NIOLooper.java b/heron/common/src/java/com/twitter/heron/common/basics/NIOLooper.java
index b5e175e..3b31eb1 100644
--- a/heron/common/src/java/com/twitter/heron/common/basics/NIOLooper.java
+++ b/heron/common/src/java/com/twitter/heron/common/basics/NIOLooper.java
@@ -208,6 +208,7 @@
                            int operation,
                            ISelectHandler callback)
       throws ClosedChannelException {
+
     SelectionKey key = channel.keyFor(selector);
 
     if (key == null) {
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.cpp b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
index 09d9fbc..d7134bb 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
@@ -40,7 +40,8 @@
       stmgr_id_(_stmgr_id),
       recvd_stmgr_pplan_(NULL),
       register_response_status(heron::proto::system::STMGR_DIDNT_REGISTER) {
-  InstallMessageHandler(&DummyInstance::HandleInstanceResponse);
+  InstallResponseHandler(new heron::proto::stmgr::RegisterInstanceRequest(),
+                         &DummyInstance::HandleInstanceResponse);
   InstallMessageHandler(&DummyInstance::HandleTupleMessage);
   InstallMessageHandler(&DummyInstance::HandleNewInstanceAssignmentMsg);
 
@@ -62,14 +63,18 @@
   }
 }
 
-void DummyInstance::HandleClose(NetworkErrorCode) {}
+void DummyInstance::HandleClose(NetworkErrorCode) {
+  AddTimer(retry_cb_, 100);
+}
 
 heron::proto::system::StatusCode DummyInstance::GetRegisterResponseStatus() {
   return register_response_status;
 }
 
-void DummyInstance::HandleInstanceResponse(
-    heron::proto::stmgr::RegisterInstanceResponse* _message) {
+void DummyInstance::HandleInstanceResponse(void*,
+    heron::proto::stmgr::RegisterInstanceResponse* _message,
+    NetworkErrorCode status) {
+  CHECK_EQ(status, OK);
   if (_message->has_pplan()) {
     if (recvd_stmgr_pplan_) {
       delete recvd_stmgr_pplan_;
@@ -87,21 +92,19 @@
     heron::proto::stmgr::NewInstanceAssignmentMessage*) {}
 
 void DummyInstance::CreateAndSendInstanceRequest() {
-  heron::proto::stmgr::RegisterInstanceRequest message;
-  heron::proto::system::Instance* instance = message.mutable_instance();
+  auto request = new heron::proto::stmgr::RegisterInstanceRequest();
+  heron::proto::system::Instance* instance = request->mutable_instance();
   instance->set_instance_id(instance_id_);
   instance->set_stmgr_id(stmgr_id_);
   instance->mutable_info()->set_task_id(task_id_);
   instance->mutable_info()->set_component_index(component_index_);
   instance->mutable_info()->set_component_name(component_name_);
-  message.set_topology_name(topology_name_);
-  message.set_topology_id(topology_id_);
-  SendMessage(message);
+  request->set_topology_name(topology_name_);
+  request->set_topology_id(topology_id_);
+  SendRequest(request, nullptr);
   return;
 }
 
-void DummyInstance::CreateAndSendTupleMessages() {}
-
 //////////////////////////////////////// DummySpoutInstance ////////////////////////////////////
 DummySpoutInstance::DummySpoutInstance(EventLoopImpl* eventLoop, const NetworkOptions& _options,
                                        const sp_string& _topology_name,
@@ -116,12 +119,8 @@
       max_msgs_to_send_(max_msgs_to_send),
       total_msgs_sent_(0),
       batch_size_(1000),
-      do_custom_grouping_(_do_custom_grouping) {}
-
-void DummySpoutInstance::HandleInstanceResponse(
-    heron::proto::stmgr::RegisterInstanceResponse* _message) {
-  DummyInstance::HandleInstanceResponse(_message);
-}
+      do_custom_grouping_(_do_custom_grouping),
+      under_backpressure_(false) {}
 
 void DummySpoutInstance::HandleNewInstanceAssignmentMsg(
     heron::proto::stmgr::NewInstanceAssignmentMessage* _msg) {
@@ -140,23 +139,25 @@
 }
 
 void DummySpoutInstance::CreateAndSendTupleMessages() {
-  for (int i = 0; (i < batch_size_) && (total_msgs_sent_ < max_msgs_to_send_);
-       ++total_msgs_sent_, ++i) {
-    heron::proto::system::HeronTupleSet tuple_set;
-    heron::proto::system::HeronDataTupleSet* data_set = tuple_set.mutable_data();
-    heron::proto::api::StreamId* tstream = data_set->mutable_stream();
-    tstream->set_id(stream_id_);
-    tstream->set_component_name(component_name_);
-    heron::proto::system::HeronDataTuple* tuple = data_set->add_tuples();
-    tuple->set_key(0);
-    // Add lots of data
-    for (size_t i = 0; i < 500; ++i) *(tuple->add_values()) = "dummy data";
+  if (!under_backpressure_) {
+    for (int i = 0; (i < batch_size_) && (total_msgs_sent_ < max_msgs_to_send_);
+         ++total_msgs_sent_, ++i) {
+      heron::proto::system::HeronTupleSet tuple_set;
+      heron::proto::system::HeronDataTupleSet* data_set = tuple_set.mutable_data();
+      heron::proto::api::StreamId* tstream = data_set->mutable_stream();
+      tstream->set_id(stream_id_);
+      tstream->set_component_name(component_name_);
+      heron::proto::system::HeronDataTuple* tuple = data_set->add_tuples();
+      tuple->set_key(0);
+      // Add lots of data
+      for (size_t i = 0; i < 500; ++i) *(tuple->add_values()) = "dummy data";
 
-    // Add custom grouping if need be
-    if (do_custom_grouping_) {
-      tuple->add_dest_task_ids(custom_grouping_dest_task_);
+      // Add custom grouping if need be
+      if (do_custom_grouping_) {
+        tuple->add_dest_task_ids(custom_grouping_dest_task_);
+      }
+      SendMessage(tuple_set);
     }
-    SendMessage(tuple_set);
   }
   if (total_msgs_sent_ != max_msgs_to_send_) {
     AddTimer([this]() { this->CreateAndSendTupleMessages(); }, 1000);
@@ -175,11 +176,6 @@
       expected_msgs_to_recv_(_expected_msgs_to_recv),
       msgs_recvd_(0) {}
 
-void DummyBoltInstance::HandleInstanceResponse(
-    heron::proto::stmgr::RegisterInstanceResponse* _message) {
-  DummyInstance::HandleInstanceResponse(_message);
-}
-
 void DummyBoltInstance::HandleTupleMessage(heron::proto::system::HeronTupleSet2* msg) {
   if (msg->has_data()) msgs_recvd_ += msg->mutable_data()->tuples_size();
   if (msgs_recvd_ >= expected_msgs_to_recv_) getEventLoop()->loopExit();
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.h b/heron/stmgr/tests/cpp/server/dummy_instance.h
index 9139d19..919413a 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.h
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.h
@@ -36,11 +36,11 @@
   void Retry() { Start(); }
 
   // Handle incoming message
-  virtual void HandleInstanceResponse(heron::proto::stmgr::RegisterInstanceResponse* _message);
+  virtual void HandleInstanceResponse(void* ctx,
+                                      heron::proto::stmgr::RegisterInstanceResponse* _message,
+                                      NetworkErrorCode status);
   // Handle incoming tuples
   virtual void HandleTupleMessage(heron::proto::system::HeronTupleSet2* _message);
-  // Send tuples
-  virtual void CreateAndSendTupleMessages();
   // Handle the instance assignment message
   virtual void HandleNewInstanceAssignmentMsg(heron::proto::stmgr::NewInstanceAssignmentMessage*);
 
@@ -76,10 +76,15 @@
 
  protected:
   // Handle incoming message
-  virtual void HandleInstanceResponse(heron::proto::stmgr::RegisterInstanceResponse* _message);
   virtual void HandleNewInstanceAssignmentMsg(
       heron::proto::stmgr::NewInstanceAssignmentMessage* _msg);
-  virtual void CreateAndSendTupleMessages();
+  void CreateAndSendTupleMessages();
+  virtual void StartBackPressureConnectionCb(Connection* connection) {
+    under_backpressure_ = true;
+  }
+  virtual void StopBackPressureConnectionCb(Connection* _connection) {
+    under_backpressure_ = false;
+  }
 
  private:
   sp_string stream_id_;
@@ -87,6 +92,7 @@
   sp_int32 total_msgs_sent_;
   sp_int32 batch_size_;
   bool do_custom_grouping_;
+  bool under_backpressure_;
   // only valid when the above is true
   sp_int32 custom_grouping_dest_task_;
 };
@@ -103,7 +109,6 @@
 
  protected:
   // Handle incoming message
-  virtual void HandleInstanceResponse(heron::proto::stmgr::RegisterInstanceResponse* _message);
   // Handle incoming tuples
   virtual void HandleTupleMessage(heron::proto::system::HeronTupleSet2* _message);
   virtual void HandleNewInstanceAssignmentMsg(
diff --git a/heron/storm/src/java/BUILD b/heron/storm/src/java/BUILD
index a825c7e..91ae2d7 100644
--- a/heron/storm/src/java/BUILD
+++ b/heron/storm/src/java/BUILD
@@ -8,6 +8,7 @@
     "//heron/api/src/java:api-java",
     "//heron/common/src/java:basics-java",
     "//heron/simulator/src/java:simulator-java",
+    "//heron/proto:proto_topology_java",
     "@com_googlecode_json_simple_json_simple//jar",
     "//third_party/java:kryo-neverlink",
 ]
diff --git a/heron/storm/src/java/org/apache/storm/Config.java b/heron/storm/src/java/org/apache/storm/Config.java
index e226a1c..25b781c 100644
--- a/heron/storm/src/java/org/apache/storm/Config.java
+++ b/heron/storm/src/java/org/apache/storm/Config.java
@@ -307,6 +307,55 @@
    */
   public static final String TRANSACTIONAL_ZOOKEEPER_PORT = "transactional.zookeeper.port";
 
+  /*
+   * Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples
+   * in the window.
+   */
+  public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT
+      = "topology.bolts.window.length.count";
+
+  /*
+   * Bolt-specific configuration for windowed bolts to specify the window length in time duration.
+   */
+  public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS
+      = "topology.bolts.window.length.duration.ms";
+
+  /*
+   * Bolt-specific configuration for windowed bolts to specify the sliding interval as a count of number of tuples.
+   */
+  public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT
+      = "topology.bolts.window.sliding.interval.count";
+
+  /*
+   * Bolt-specific configuration for windowed bolts to specify the sliding interval in time duration.
+   */
+  public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS
+      = "topology.bolts.window.sliding.interval.duration.ms";
+
+  /**
+   * Bolt-specific configuration for windowed bolts to specify the name of the stream on which late tuples are
+   * going to be emitted. This configuration should only be used from the BaseWindowedBolt.withLateTupleStream builder
+   * method, and not as global parameter, otherwise IllegalArgumentException is going to be thrown.
+   */
+  public static final String TOPOLOGY_BOLTS_LATE_TUPLE_STREAM
+      = "topology.bolts.late.tuple.stream";
+
+  /**
+   * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp
+   * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.
+   * This config will be effective only if {@link org.apache.storm.windowing.TimestampExtractor} is specified.
+   */
+  public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS
+      = "topology.bolts.tuple.timestamp.max.lag.ms";
+
+  /*
+   * Bolt-specific configuration for windowed bolts to specify the time interval for generating
+   * watermark events. Watermark event tracks the progress of time when tuple timestamp is used.
+   * This config is effective only if {@link org.apache.storm.windowing.TimestampExtractor} is specified.
+   */
+  public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS
+      = "topology.bolts.watermark.event.interval.ms";
+
   /**
    * ----  DO NOT USE -----
    * This variable is used to rewrite the TOPOLOGY_AUTO_TASK_HOOKS variable.
diff --git a/heron/storm/src/java/org/apache/storm/spout/ISpout.java b/heron/storm/src/java/org/apache/storm/spout/ISpout.java
index f0c2cb6..e2ee367 100644
--- a/heron/storm/src/java/org/apache/storm/spout/ISpout.java
+++ b/heron/storm/src/java/org/apache/storm/spout/ISpout.java
@@ -57,7 +57,7 @@
    * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
    */
   @SuppressWarnings("rawtypes")
-  void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
+  void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
 
   /**
    * Called when an ISpout is going to be shutdown. There is no guarentee that close
diff --git a/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java b/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
index b6a8ac7..6232e8a 100644
--- a/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
+++ b/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
@@ -45,7 +45,7 @@
 
   @Override
   @SuppressWarnings("rawtypes")
-  public void open(Map conf, com.twitter.heron.api.topology.TopologyContext context,
+  public void open(Map<String, Object> conf, com.twitter.heron.api.topology.TopologyContext context,
                    SpoutOutputCollector collector) {
     topologyContextImpl = new TopologyContext(context);
     spoutOutputCollectorImpl = new SpoutOutputCollectorImpl(collector);
diff --git a/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java b/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java
new file mode 100644
index 0000000..d848c43
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import java.util.Map;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.windowing.TimestampExtractor;
+import org.apache.storm.windowing.TupleWindow;
+
+/**
+ * A bolt abstraction for supporting time and count based sliding & tumbling windows.
+ */
+public interface IWindowedBolt extends IComponent {
+  /**
+   * This is similar to the
+   * {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except
+   * that while emitting, the tuples are automatically anchored to the tuples in the inputWindow.
+   */
+  void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);
+
+  /**
+   * Process the tuple window and optionally emit new tuples based on the tuples in the input
+   * window.
+   */
+  void execute(TupleWindow inputWindow);
+
+  void cleanup();
+
+  /**
+   * Return a {@link TimestampExtractor} for extracting timestamps from a
+   * tuple for event time based processing, or null for processing time.
+   *
+   * @return the timestamp extractor
+   */
+  TimestampExtractor getTimestampExtractor();
+}
diff --git a/heron/storm/src/java/org/apache/storm/topology/IWindowedBoltDelegate.java b/heron/storm/src/java/org/apache/storm/topology/IWindowedBoltDelegate.java
new file mode 100644
index 0000000..46be02a
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/topology/IWindowedBoltDelegate.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import java.util.Map;
+
+import org.apache.storm.task.OutputCollectorImpl;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.windowing.TupleWindowImpl;
+
+public class IWindowedBoltDelegate implements com.twitter.heron.api.bolt.IWindowedBolt {
+
+  private static final long serialVersionUID = 8753943395082633132L;
+  private final IWindowedBolt delegate;
+  private TopologyContext topologyContextImpl;
+  private OutputCollectorImpl outputCollectorImpl;
+
+  public IWindowedBoltDelegate(IWindowedBolt iWindowedBolt) {
+    this.delegate = iWindowedBolt;
+  }
+
+  @Override
+  public void declareOutputFields(com.twitter.heron.api.topology.OutputFieldsDeclarer declarer) {
+    OutputFieldsGetter getter = new OutputFieldsGetter(declarer);
+    delegate.declareOutputFields(getter);
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return this.delegate.getComponentConfiguration();
+  }
+
+  @Override
+  public void prepare(
+      Map<String, Object> conf,
+      com.twitter.heron.api.topology.TopologyContext context,
+      com.twitter.heron.api.bolt.OutputCollector collector) {
+    topologyContextImpl = new TopologyContext(context);
+    outputCollectorImpl = new OutputCollectorImpl(collector);
+    delegate.prepare(conf, topologyContextImpl, outputCollectorImpl);
+  }
+
+  @Override
+  public void execute(com.twitter.heron.api.windowing.TupleWindow inputWindow) {
+    this.delegate.execute(new TupleWindowImpl(inputWindow));
+  }
+
+  @Override
+  public void cleanup() {
+    this.delegate.cleanup();
+  }
+
+  @Override
+  public com.twitter.heron.api.windowing.TimestampExtractor getTimestampExtractor() {
+
+    return (this.delegate.getTimestampExtractor() == null) ? null
+        : new com.twitter.heron.api.windowing.TimestampExtractor() {
+          @Override
+          public long extractTimestamp(com.twitter.heron.api.tuple.Tuple tuple) {
+            return delegate.getTimestampExtractor().extractTimestamp(new TupleImpl(tuple));
+          }
+        };
+  }
+}
diff --git a/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java b/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java
index dd9ec5b..a9594a0 100644
--- a/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java
+++ b/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java
@@ -24,6 +24,7 @@
 import org.apache.storm.generated.StormTopology;
 
 import com.twitter.heron.api.HeronTopology;
+import com.twitter.heron.api.bolt.WindowedBoltExecutor;
 
 public class TopologyBuilder {
   private com.twitter.heron.api.topology.TopologyBuilder delegate =
@@ -53,6 +54,19 @@
     return setBolt(id, new BasicBoltExecutor(bolt), parallelismHint);
   }
 
+  public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
+    return setBolt(id, bolt, null);
+  }
+
+  public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelismHint) throws
+      IllegalArgumentException {
+    return new BoltDeclarerImpl(
+        this.delegate.setBolt(
+            id, new WindowedBoltExecutor(new IWindowedBoltDelegate(bolt)),
+            parallelismHint)
+    );
+  }
+
   public SpoutDeclarer setSpout(String id, IRichSpout spout) {
     return setSpout(id, spout, null);
   }
diff --git a/heron/storm/src/java/org/apache/storm/topology/TupleFieldTimestampExtractor.java b/heron/storm/src/java/org/apache/storm/topology/TupleFieldTimestampExtractor.java
new file mode 100644
index 0000000..9f3a045
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/topology/TupleFieldTimestampExtractor.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TimestampExtractor;
+
+/**
+ * A {@link TimestampExtractor} that extracts timestamp from a specific field in the tuple.
+ */
+public final class TupleFieldTimestampExtractor implements TimestampExtractor {
+  private static final long serialVersionUID = 5212225232807940572L;
+  private final String fieldName;
+
+  private TupleFieldTimestampExtractor(String fieldName) {
+    this.fieldName = fieldName;
+  }
+
+  @Override
+  public long extractTimestamp(Tuple tuple) {
+    return tuple.getLongByField(fieldName);
+  }
+
+  public static TupleFieldTimestampExtractor of(String fieldName) {
+    return new TupleFieldTimestampExtractor(fieldName);
+  }
+
+  @Override
+  public String toString() {
+    return "TupleFieldTimestampExtractor{" + "fieldName='" + fieldName + '\'' + '}';
+  }
+}
diff --git a/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java b/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java
new file mode 100644
index 0000000..b0b5389
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology.base;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IWindowedBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TupleFieldTimestampExtractor;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.windowing.TimestampExtractor;
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+
+public abstract class BaseWindowedBolt implements IWindowedBolt {
+  private static final long serialVersionUID = -3998164228343123590L;
+  protected final transient com.twitter.heron.api.windowing.WindowingConfigs windowConfiguration;
+  protected com.twitter.heron.api.windowing.TimestampExtractor timestampExtractor;
+
+  /**
+   * Holds a count value for count based windows and sliding intervals.
+   */
+  public static class Count implements Serializable {
+    private static final long serialVersionUID = -2290882388716246812L;
+    public final int value;
+
+    public Count(int value) {
+      this.value = value;
+    }
+
+    /**
+     * Returns a {@link Count} of given value.
+     *
+     * @param value the count value
+     * @return the Count
+     */
+    public static Count of(int value) {
+      return new Count(value);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      Count count = (Count) o;
+
+      return value == count.value;
+
+    }
+
+    @Override
+    public int hashCode() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return "Count{" + "value=" + value + '}';
+    }
+  }
+
+  /**
+   * Holds a Time duration for time based windows and sliding intervals.
+   */
+  public static class Duration implements Serializable {
+    private static final long serialVersionUID = 5654070568075477148L;
+    public final int value;
+
+    public Duration(int value, TimeUnit timeUnit) {
+      this.value = (int) timeUnit.toMillis(value);
+    }
+
+    /**
+     * Returns a {@link Duration} corresponding to the the given value in milli seconds.
+     *
+     * @param milliseconds the duration in milliseconds
+     * @return the Duration
+     */
+    public static Duration of(int milliseconds) {
+      return new Duration(milliseconds, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Returns a {@link Duration} corresponding to the the given value in days.
+     *
+     * @param days the number of days
+     * @return the Duration
+     */
+    public static Duration days(int days) {
+      return new Duration(days, TimeUnit.DAYS);
+    }
+
+    /**
+     * Returns a {@link Duration} corresponding to the the given value in hours.
+     *
+     * @param hours the number of hours
+     * @return the Duration
+     */
+    public static Duration hours(int hours) {
+      return new Duration(hours, TimeUnit.HOURS);
+    }
+
+    /**
+     * Returns a {@link Duration} corresponding to the the given value in minutes.
+     *
+     * @param minutes the number of minutes
+     * @return the Duration
+     */
+    public static Duration minutes(int minutes) {
+      return new Duration(minutes, TimeUnit.MINUTES);
+    }
+
+    /**
+     * Returns a {@link Duration} corresponding to the the given value in seconds.
+     *
+     * @param seconds the number of seconds
+     * @return the Duration
+     */
+    public static Duration seconds(int seconds) {
+      return new Duration(seconds, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      Duration duration = (Duration) o;
+
+      return value == duration.value;
+
+    }
+
+    @Override
+    public int hashCode() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return "Duration{" + "value=" + value + '}';
+    }
+  }
+
+  protected BaseWindowedBolt() {
+    windowConfiguration = new com.twitter.heron.api.windowing.WindowingConfigs();
+  }
+
+  private BaseWindowedBolt withWindowLength(Count count) {
+    if (count.value <= 0) {
+      throw new IllegalArgumentException("Window length must be positive [" + count + "]");
+    }
+    windowConfiguration.setTopologyBoltsWindowLengthCount(count.value);
+    return this;
+  }
+
+  private BaseWindowedBolt withWindowLength(Duration duration) {
+    if (duration.value <= 0) {
+      throw new IllegalArgumentException("Window length must be positive [" + duration + "]");
+    }
+    windowConfiguration.setTopologyBoltsWindowLengthDurationMs(duration.value);
+    return this;
+  }
+
+  private BaseWindowedBolt withSlidingInterval(Count count) {
+    if (count.value <= 0) {
+      throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]");
+    }
+    windowConfiguration.setTopologyBoltsSlidingIntervalCount(count.value);
+    return this;
+  }
+
+  private BaseWindowedBolt withSlidingInterval(Duration duration) {
+    if (duration.value <= 0) {
+      throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]");
+    }
+    windowConfiguration.setTopologyBoltsSlidingIntervalDurationMs(duration.value);
+    return this;
+  }
+
+  /**
+   * Tuple count based sliding window configuration.
+   *
+   * @param windowLength    the number of tuples in the window
+   * @param slidingInterval the number of tuples after which the window slides
+   */
+  public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
+    return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+  }
+
+  /**
+   * Tuple count and time duration based sliding window configuration.
+   *
+   * @param windowLength    the number of tuples in the window
+   * @param slidingInterval the time duration after which the window slides
+   */
+  public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
+    return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+  }
+
+  /**
+   * Time duration and count based sliding window configuration.
+   *
+   * @param windowLength    the time duration of the window
+   * @param slidingInterval the number of tuples after which the window slides
+   */
+  public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
+    return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+  }
+
+  /**
+   * Time duration based sliding window configuration.
+   *
+   * @param windowLength    the time duration of the window
+   * @param slidingInterval the time duration after which the window slides
+   */
+  public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
+    return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+  }
+
+  /**
+   * A tuple count based window that slides with every incoming tuple.
+   *
+   * @param windowLength the number of tuples in the window
+   */
+  public BaseWindowedBolt withWindow(Count windowLength) {
+    return withWindowLength(windowLength).withSlidingInterval(new Count(1));
+  }
+
+  /**
+   * A time duration based window that slides with every incoming tuple.
+   *
+   * @param windowLength the time duration of the window
+   */
+  public BaseWindowedBolt withWindow(Duration windowLength) {
+    return withWindowLength(windowLength).withSlidingInterval(new Count(1));
+  }
+
+  /**
+   * A count based tumbling window.
+   *
+   * @param count the number of tuples after which the window tumbles
+   */
+  public BaseWindowedBolt withTumblingWindow(Count count) {
+    return withWindowLength(count).withSlidingInterval(count);
+  }
+
+  /**
+   * A time duration based tumbling window.
+   *
+   * @param duration the time duration after which the window tumbles
+   */
+  public BaseWindowedBolt withTumblingWindow(Duration duration) {
+    return withWindowLength(duration).withSlidingInterval(duration);
+  }
+
+  /**
+   * Specify a field in the tuple that represents the timestamp as a long value. If this
+   * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+   *
+   * @param fieldName the name of the field that contains the timestamp
+   */
+  public BaseWindowedBolt withTimestampField(String fieldName) {
+    return withTimestampExtractor(TupleFieldTimestampExtractor.of(fieldName));
+  }
+
+  /**
+   * Specify the timestamp extractor implementation.
+   *
+   * @param timestampExtractor the {@link TimestampExtractor} implementation
+   */
+  @SuppressWarnings("HiddenField")
+  public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+    if (this.timestampExtractor != null) {
+      throw new IllegalArgumentException(
+          "Window is already configured with a timestamp extractor: " + timestampExtractor);
+    }
+
+    this.timestampExtractor = new com.twitter.heron.api.windowing.TimestampExtractor() {
+
+      @Override
+      public long extractTimestamp(Tuple tuple) {
+        return timestampExtractor.extractTimestamp(new TupleImpl(tuple));
+      }
+    };
+    return this;
+  }
+
+  @Override
+  public TimestampExtractor getTimestampExtractor() {
+    return (this.timestampExtractor == null) ? null : new TimestampExtractor() {
+
+      @Override
+      public long extractTimestamp(org.apache.storm.tuple.Tuple tuple) {
+
+        return timestampExtractor.extractTimestamp(new com.twitter.heron.api.tuple.Tuple() {
+
+          @Override
+          public int size() {
+            return tuple.size();
+          }
+
+          @Override
+          public int fieldIndex(String field) {
+            return tuple.fieldIndex(field);
+          }
+
+          @Override
+          public boolean contains(String field) {
+            return tuple.contains(field);
+          }
+
+          @Override
+          public Object getValue(int i) {
+            return tuple.getValue(i);
+          }
+
+          @Override
+          public String getString(int i) {
+            return tuple.getString(i);
+          }
+
+          @Override
+          public Integer getInteger(int i) {
+            return tuple.getInteger(i);
+          }
+
+          @Override
+          public Long getLong(int i) {
+            return tuple.getLong(i);
+          }
+
+          @Override
+          public Boolean getBoolean(int i) {
+            return tuple.getBoolean(i);
+          }
+
+          @Override
+          public Short getShort(int i) {
+            return tuple.getShort(i);
+          }
+
+          @Override
+          public Byte getByte(int i) {
+            return tuple.getByte(i);
+          }
+
+          @Override
+          public Double getDouble(int i) {
+            return tuple.getDouble(i);
+          }
+
+          @Override
+          public Float getFloat(int i) {
+            return tuple.getFloat(i);
+          }
+
+          @Override
+          public byte[] getBinary(int i) {
+            return tuple.getBinary(i);
+          }
+
+          @Override
+          public Object getValueByField(String field) {
+            return tuple.getValueByField(field);
+          }
+
+          @Override
+          public String getStringByField(String field) {
+            return tuple.getStringByField(field);
+          }
+
+          @Override
+          public Integer getIntegerByField(String field) {
+            return tuple.getIntegerByField(field);
+          }
+
+          @Override
+          public Long getLongByField(String field) {
+            return tuple.getLongByField(field);
+          }
+
+          @Override
+          public Boolean getBooleanByField(String field) {
+            return tuple.getBooleanByField(field);
+          }
+
+          @Override
+          public Short getShortByField(String field) {
+            return tuple.getShortByField(field);
+          }
+
+          @Override
+          public Byte getByteByField(String field) {
+            return tuple.getByteByField(field);
+          }
+
+          @Override
+          public Double getDoubleByField(String field) {
+            return tuple.getDoubleByField(field);
+          }
+
+          @Override
+          public Float getFloatByField(String field) {
+            return tuple.getFloatByField(field);
+          }
+
+          @Override
+          public byte[] getBinaryByField(String field) {
+            return tuple.getBinaryByField(field);
+          }
+
+          @Override
+          public List<Object> getValues() {
+            return tuple.getValues();
+          }
+
+          @Override
+          public Fields getFields() {
+            return new Fields(tuple.getFields().toList());
+          }
+
+          @Override
+          public List<Object> select(Fields selector) {
+            return tuple.select(new org.apache.storm.tuple.Fields(selector.toString()));
+          }
+
+          @Override
+          public TopologyAPI.StreamId getSourceGlobalStreamId() {
+            return TopologyAPI.StreamId.newBuilder().setId(tuple.getSourceStreamId())
+                .setComponentName(tuple.getSourceComponent()).build();
+          }
+
+          @Override
+          public String getSourceComponent() {
+            return tuple.getSourceComponent();
+          }
+
+          @Override
+          public int getSourceTask() {
+            return tuple.getSourceTask();
+          }
+
+          @Override
+          public String getSourceStreamId() {
+            return tuple.getSourceStreamId();
+          }
+
+          @Override
+          public void resetValues() {
+            tuple.resetValues();
+          }
+        });
+      }
+    };
+  }
+
+  /**
+   * Specify a stream id on which late tuples are going to be emitted.
+   * It must be defined on a per-component basis, and in conjunction with the
+   * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will
+   * be thrown.
+   *
+   * @param streamId the name of the stream used to emit late tuples on
+   */
+  public BaseWindowedBolt withLateTupleStream(String streamId) {
+    windowConfiguration.setTopologyBoltsLateTupleStream(streamId);
+    return this;
+  }
+
+
+  /**
+   * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple
+   * timestamps
+   * cannot be out of order by more than this amount.
+   *
+   * @param duration the max lag duration
+   */
+  public BaseWindowedBolt withLag(Duration duration) {
+    windowConfiguration.setTopologyBoltsTupleTimestampMaxLagMs(duration.value);
+    return this;
+  }
+
+  /**
+   * Specify the watermark event generation interval. For tuple based timestamps, watermark events
+   * are used to track the progress of time
+   *
+   * @param interval the interval at which watermark events are generated
+   */
+  public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+    windowConfiguration.setTopologyBoltsWatermarkEventIntervalMs(interval.value);
+    return this;
+  }
+
+  @Override
+  public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector
+      collector) {
+    // NOOP
+  }
+
+  @Override
+  public void cleanup() {
+    // NOOP
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    // NOOP
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return windowConfiguration;
+  }
+}
diff --git a/heron/storm/src/java/org/apache/storm/tuple/Tuple.java b/heron/storm/src/java/org/apache/storm/tuple/Tuple.java
index 65eb6c8..c238346 100644
--- a/heron/storm/src/java/org/apache/storm/tuple/Tuple.java
+++ b/heron/storm/src/java/org/apache/storm/tuple/Tuple.java
@@ -39,110 +39,190 @@
   int size();
 
   /**
-   * Returns the position of the specified field in this tuple.
-   */
-  int fieldIndex(String field);
-
-  /**
    * Returns true if this tuple contains the specified name of the field.
    */
   boolean contains(String field);
 
   /**
-   * Gets the field at position i in the tuple. Returns object since tuples are dynamically typed.
-   */
-  Object getValue(int i);
-
-  /**
-   * Returns the String at position i in the tuple. If that field is not a String,
-   * you will get a runtime error.
-   */
-  String getString(int i);
-
-  /**
-   * Returns the Integer at position i in the tuple. If that field is not an Integer,
-   * you will get a runtime error.
-   */
-  Integer getInteger(int i);
-
-  /**
-   * Returns the Long at position i in the tuple. If that field is not a Long,
-   * you will get a runtime error.
-   */
-  Long getLong(int i);
-
-  /**
-   * Returns the Boolean at position i in the tuple. If that field is not a Boolean,
-   * you will get a runtime error.
-   */
-  Boolean getBoolean(int i);
-
-  /**
-   * Returns the Short at position i in the tuple. If that field is not a Short,
-   * you will get a runtime error.
-   */
-  Short getShort(int i);
-
-  /**
-   * Returns the Byte at position i in the tuple. If that field is not a Byte,
-   * you will get a runtime error.
-   */
-  Byte getByte(int i);
-
-  /**
-   * Returns the Double at position i in the tuple. If that field is not a Double,
-   * you will get a runtime error.
-   */
-  Double getDouble(int i);
-
-  /**
-   * Returns the Float at position i in the tuple. If that field is not a Float,
-   * you will get a runtime error.
-   */
-  Float getFloat(int i);
-
-  /**
-   * Returns the byte array at position i in the tuple. If that field is not a byte array,
-   * you will get a runtime error.
-   */
-  byte[] getBinary(int i);
-
-
-  Object getValueByField(String field);
-
-  String getStringByField(String field);
-
-  Integer getIntegerByField(String field);
-
-  Long getLongByField(String field);
-
-  Boolean getBooleanByField(String field);
-
-  Short getShortByField(String field);
-
-  Byte getByteByField(String field);
-
-  Double getDoubleByField(String field);
-
-  Float getFloatByField(String field);
-
-  byte[] getBinaryByField(String field);
-
-  /**
-   * Gets all the values in this tuple.
-   */
-  List<Object> getValues();
-
-  /**
    * Gets the names of the fields in this tuple.
    */
   Fields getFields();
 
   /**
+   *  Returns the position of the specified field in this tuple.
+   *
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  int fieldIndex(String field);
+
+  /**
    * Returns a subset of the tuple based on the fields selector.
    */
   List<Object> select(Fields selector);
 
+  /**
+   * Gets the field at position i in the tuple. Returns object since tuples are dynamically typed.
+   *
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  Object getValue(int i);
+
+  /**
+   * Returns the String at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a String
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  String getString(int i);
+
+  /**
+   * Returns the Integer at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a Integer
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  Integer getInteger(int i);
+
+  /**
+   * Returns the Long at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a Long
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  Long getLong(int i);
+
+  /**
+   * Returns the Boolean at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a Boolean
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  Boolean getBoolean(int i);
+
+  /**
+   * Returns the Short at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a Short
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  Short getShort(int i);
+
+  /**
+   * Returns the Byte at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a Byte
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  Byte getByte(int i);
+
+  /**
+   * Returns the Double at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a Double
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  Double getDouble(int i);
+
+  /**
+   * Returns the Float at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a Float
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  Float getFloat(int i);
+
+  /**
+   * Returns the byte array at position i in the tuple.
+   *
+   * @throws ClassCastException If that field is not a byte array
+   * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+   */
+  byte[] getBinary(int i);
+
+  /**
+   * Gets the field with a specific name. Returns object since tuples are dynamically typed.
+   *
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  Object getValueByField(String field);
+
+  /**
+   * Gets the String field with a specific name.
+   *
+   * @throws ClassCastException If that field is not a String
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  String getStringByField(String field);
+
+  /**
+   * Gets the Integer field with a specific name.
+   *
+   * @throws ClassCastException If that field is not an Integer
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  Integer getIntegerByField(String field);
+
+  /**
+   * Gets the Long field with a specific name.
+   *
+   * @throws ClassCastException If that field is not a Long
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  Long getLongByField(String field);
+
+  /**
+   * Gets the Boolean field with a specific name.
+   *
+   * @throws ClassCastException If that field is not a Boolean
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  Boolean getBooleanByField(String field);
+
+  /**
+   * Gets the Short field with a specific name.
+   *
+   * @throws ClassCastException If that field is not a Short
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  Short getShortByField(String field);
+
+  /**
+   * Gets the Byte field with a specific name.
+   *
+   * @throws ClassCastException If that field is not a Byte
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  Byte getByteByField(String field);
+
+  /**
+   * Gets the Double field with a specific name.
+   *
+   * @throws ClassCastException If that field is not a Double
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  Double getDoubleByField(String field);
+
+  /**
+   * Gets the Float field with a specific name.
+   *
+   * @throws ClassCastException If that field is not a Float
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  Float getFloatByField(String field);
+
+  /**
+   * Gets the Byte array field with a specific name.
+   *
+   * @throws ClassCastException If that field is not a byte array
+   * @throws IllegalArgumentException - if field does not exist
+   */
+  byte[] getBinaryByField(String field);
+
+  /**
+   * Gets all the values in this tuple.
+   */
+  List<Object> getValues();
+
 
   /**
    * Returns the global stream id (component + stream) of this tuple.
diff --git a/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java b/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java
index 9c99768..30d4090 100644
--- a/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java
+++ b/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java
@@ -248,4 +248,20 @@
   public void resetValues() {
     delegate.resetValues();
   }
+
+  @Override
+  public String toString() {
+    return "source: " + getSourceComponent() + ":" + getSourceTask()
+        + ", stream: " + getSourceStreamId() + ", " + getValues().toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return this == other;
+  }
+
+  @Override
+  public int hashCode() {
+    return System.identityHashCode(this);
+  }
 }
diff --git a/heron/storm/src/java/org/apache/storm/windowing/TimestampExtractor.java b/heron/storm/src/java/org/apache/storm/windowing/TimestampExtractor.java
new file mode 100644
index 0000000..14c6054
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/windowing/TimestampExtractor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.windowing;
+
+import java.io.Serializable;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Interface to be implemented for extracting timestamp from a tuple.
+ */
+public interface TimestampExtractor extends Serializable {
+  /**
+   * Return the tuple timestamp indicating the time when the event happened.
+   *
+   * @param tuple the tuple
+   * @return the timestamp
+   */
+  long extractTimestamp(Tuple tuple);
+}
diff --git a/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java b/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java
new file mode 100644
index 0000000..0ed5810
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * A {@link Window} that contains {@link Tuple} objects.
+ */
+public interface TupleWindow extends Window<Tuple> {
+}
diff --git a/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java b/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java
new file mode 100644
index 0000000..1d2f9e4
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.windowing;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+
+public class TupleWindowImpl implements TupleWindow {
+
+  private final com.twitter.heron.api.windowing.TupleWindow delegate;
+
+  public TupleWindowImpl(com.twitter.heron.api.windowing.TupleWindow tupleWindow) {
+    this.delegate = tupleWindow;
+  }
+
+  @Override
+  public List<Tuple> get() {
+    return convert(this.delegate.get());
+  }
+
+  @Override
+  public List<Tuple> getNew() {
+    return convert(this.delegate.getNew());
+  }
+
+  @Override
+  public List<Tuple> getExpired() {
+    return convert(this.delegate.getExpired());
+  }
+
+  @Override
+  public Long getEndTimestamp() {
+    return this.delegate.getEndTimestamp();
+  }
+
+  @Override
+  public Long getStartTimestamp() {
+    return this.delegate.getStartTimestamp();
+  }
+
+  private static List<Tuple> convert(List<com.twitter.heron.api.tuple.Tuple> tuples) {
+    List<Tuple> ret = new LinkedList<>();
+    for (com.twitter.heron.api.tuple.Tuple tuple : tuples) {
+      ret.add(new TupleImpl(tuple));
+    }
+    return ret;
+  }
+}
diff --git a/heron/storm/src/java/org/apache/storm/windowing/Window.java b/heron/storm/src/java/org/apache/storm/windowing/Window.java
new file mode 100644
index 0000000..5142ab8
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/windowing/Window.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A view of events in a sliding window.
+ *
+ * @param <T> the type of event that this window contains. E.g. {@link org.apache.storm.tuple.Tuple}
+ */
+public interface Window<T> {
+  /**
+   * Gets the list of events in the window.
+   * <p>
+   *     <b>Note: </b> If the number of tuples in windows is huge, invoking {@code get} would
+   *                   load all the tuples into memory and may throw an OOM exception. Use
+   *                   windowing with persistence
+   * </p>
+   * @return the list of events in the window.
+   */
+  List<T> get();
+
+  /**
+   * Returns an iterator over the events in the window.
+   * @return an {@link Iterator} over the events in the current window.
+   */
+  default Iterator<T> getIter() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  /**
+   * Get the list of newly added events in the window since the last time the window was generated.
+   * @return the list of newly added events in the window.
+   */
+  List<T> getNew();
+
+  /**
+   * Get the list of events expired from the window since the last time the window was generated.
+   * @return the list of events expired from the window.
+   */
+  List<T> getExpired();
+
+  /**
+   * If processing based on event time, returns the window end time based on watermark otherwise
+   * returns the window end time based on processing time.
+   *
+   * @return the window end timestamp
+   */
+  Long getEndTimestamp();
+
+  /**
+   * Returns the window start timestamp. Will return null if the window length is not based on
+   * time duration.
+   *
+   * @return the window start timestamp or null if the window length is not time based
+   */
+  Long getStartTimestamp();
+}
diff --git a/integration_test/src/__init__.py b/integration_test/src/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/__init__.py
diff --git a/integration_test/src/java/BUILD b/integration_test/src/java/BUILD
index 71a0bd7..693d3c2 100644
--- a/integration_test/src/java/BUILD
+++ b/integration_test/src/java/BUILD
@@ -47,6 +47,7 @@
     deps = [
         "//heron/api/src/java:api-java",
         "//heron/storm/src/java:storm-compatibility-java",
+        "@com_googlecode_json_simple_json_simple//jar",
         "@commons_cli_commons_cli//jar",
         ":common",
         ":core"
@@ -75,6 +76,7 @@
         "//heron/api/src/java:api-java",
         "//heron/storm/src/java:storm-compatibility-java",
         "@commons_cli_commons_cli//jar",
+        "@com_googlecode_json_simple_json_simple//jar",
         ":common",
         ":core"
     ],
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java b/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
index dfaa034..19aec08 100644
--- a/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
+++ b/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
@@ -42,8 +42,12 @@
   private Tuple currentTupleProcessing = null;
   private OutputCollector collector;
 
-  public IntegrationTestBolt(IRichBolt delegate) {
+  //whether automatically acking should be done
+  private boolean ackAuto;
+
+  public IntegrationTestBolt(IRichBolt delegate, boolean ackAuto) {
     this.delegateBolt = delegate;
+    this.ackAuto = ackAuto;
   }
 
   @Override
@@ -61,7 +65,7 @@
                       OutputCollector outputCollector) {
     update(context);
     this.collector = new OutputCollector(new IntegrationTestBoltCollector(outputCollector));
-    this.delegateBolt.prepare(map, context, collector);
+    this.delegateBolt.prepare(map, new TestTopologyContext(context), collector);
   }
 
   private int calculateTerminalsToReceive(TopologyContext context) {
@@ -108,7 +112,9 @@
       currentTupleProcessing = tuple;
       delegateBolt.execute(tuple);
       // We ack only the tuples in user's logic
-      collector.ack(tuple);
+      if (this.ackAuto) {
+        collector.ack(tuple);
+      }
     }
   }
 
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestSpout.java b/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestSpout.java
index 0fc4dd6..08aee93 100644
--- a/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestSpout.java
+++ b/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestSpout.java
@@ -96,7 +96,7 @@
     // Here the spoutOutputCollector should be a default one
     // to emit tuples without adding MessageId
     this.spoutOutputCollector = outputCollector;
-    delegateSpout.open(map, topologyContext,
+    delegateSpout.open(map, new TestTopologyContext(topologyContext),
         new SpoutOutputCollector(new IntegrationTestSpoutCollector(outputCollector)));
   }
 
@@ -153,6 +153,7 @@
   public void fail(Object messageId) {
     LOG.info("Received a fail with MessageId: " + messageId);
 
+    tuplesToAck--;
     if (!isTestMessageId(messageId)) {
       delegateSpout.fail(messageId);
     } else {
@@ -166,7 +167,8 @@
   }
 
   private static boolean isTestMessageId(Object messageId) {
-    return ((String) messageId).startsWith(Constants.INTEGRATION_TEST_MOCK_MESSAGE_ID);
+    return  (messageId instanceof String) && ((String) messageId)
+        .startsWith(Constants.INTEGRATION_TEST_MOCK_MESSAGE_ID);
   }
 
   protected boolean doneEmitting() {
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyBuilder.java b/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyBuilder.java
index 7290506..e34d3d5 100644
--- a/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyBuilder.java
+++ b/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyBuilder.java
@@ -22,6 +22,8 @@
 import com.twitter.heron.api.Config;
 import com.twitter.heron.api.HeronTopology;
 import com.twitter.heron.api.bolt.IRichBolt;
+import com.twitter.heron.api.bolt.IWindowedBolt;
+import com.twitter.heron.api.bolt.WindowedBoltExecutor;
 import com.twitter.heron.api.generated.TopologyAPI;
 import com.twitter.heron.api.spout.IRichSpout;
 import com.twitter.heron.api.topology.BoltDeclarer;
@@ -68,7 +70,17 @@
 
   @Override
   public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) {
-    return super.setBolt(id, new IntegrationTestBolt(bolt), parallelismHint);
+    return setBolt(id, bolt, parallelismHint, true);
+  }
+
+  public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint, boolean ackAuto) {
+    return super.setBolt(id, new IntegrationTestBolt(bolt, ackAuto), parallelismHint);
+  }
+
+  public BoltDeclarer setBolt(String id, IWindowedBolt bolt,
+                              Number parallelismHint, boolean ackAuto) throws
+      IllegalArgumentException {
+    return setBolt(id, new WindowedBoltExecutor(bolt), parallelismHint, ackAuto);
   }
 
   @Override
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyContext.java b/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyContext.java
new file mode 100644
index 0000000..421506b
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyContext.java
@@ -0,0 +1,172 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.core;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.api.hooks.ITaskHook;
+import com.twitter.heron.api.metric.CombinedMetric;
+import com.twitter.heron.api.metric.ICombiner;
+import com.twitter.heron.api.metric.IMetric;
+import com.twitter.heron.api.metric.IReducer;
+import com.twitter.heron.api.metric.ReducedMetric;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+
+public class TestTopologyContext implements TopologyContext {
+  private TopologyContext delegate;
+
+  public TestTopologyContext(TopologyContext topologyContext) {
+    this.delegate = topologyContext;
+  }
+
+  @Override
+  public int getThisTaskId() {
+    return this.delegate.getThisTaskId();
+  }
+
+  @Override
+  public String getThisComponentId() {
+    return this.delegate.getThisComponentId();
+  }
+
+  @Override
+  public Fields getThisOutputFields(String streamId) {
+    return this.delegate.getThisOutputFields(streamId);
+  }
+
+  @Override
+  public Set<String> getThisStreams() {
+    return this.delegate.getThisStreams();
+  }
+
+  @Override
+  public int getThisTaskIndex() {
+    return this.delegate.getThisTaskIndex();
+  }
+
+  /**
+   * remove INTEGRATION_TEST_CONTROL_STREAM_ID from topology context
+   */
+  @Override
+  public Map<TopologyAPI.StreamId, TopologyAPI.Grouping> getThisSources() {
+    Map<TopologyAPI.StreamId, TopologyAPI.Grouping> original = getSources(getThisComponentId());
+    Map<TopologyAPI.StreamId, TopologyAPI.Grouping> ret = new HashMap<>();
+    for (Map.Entry<TopologyAPI.StreamId, TopologyAPI.Grouping> entry : original.entrySet()) {
+      if (!entry.getKey().getId().equals(Constants.INTEGRATION_TEST_CONTROL_STREAM_ID)) {
+        ret.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public Map<String, Map<String, TopologyAPI.Grouping>> getThisTargets() {
+    return this.delegate.getThisTargets();
+  }
+
+  @Override
+  public void setTaskData(String name, Object data) {
+    this.delegate.setTaskData(name, data);
+  }
+
+  @Override
+  public Object getTaskData(String name) {
+    return this.delegate.getTaskData(name);
+  }
+
+  @Override
+  public void addTaskHook(ITaskHook hook) {
+    this.delegate.addTaskHook(hook);
+  }
+
+  @Override
+  public Collection<ITaskHook> getHooks() {
+    return this.delegate.getHooks();
+  }
+
+  @Override
+  public <T, U, V> ReducedMetric<T, U, V> registerMetric(String name, IReducer<T, U, V> reducer,
+                                                         int timeBucketSizeInSecs) {
+    return this.delegate.registerMetric(name, reducer, timeBucketSizeInSecs);
+  }
+
+  @Override
+  public <T> CombinedMetric<T> registerMetric(String name, ICombiner<T> combiner, int
+      timeBucketSizeInSecs) {
+    return this.delegate.registerMetric(name, combiner, timeBucketSizeInSecs);
+  }
+
+  @Override
+  public <T extends IMetric<U>, U> T registerMetric(String name, T metric, int
+      timeBucketSizeInSecs) {
+    return this.delegate.registerMetric(name, metric, timeBucketSizeInSecs);
+  }
+
+  @Override
+  public String getTopologyId() {
+    return this.delegate.getTopologyId();
+  }
+
+  @Override
+  public String getComponentId(int taskId) {
+    return this.delegate.getComponentId(taskId);
+  }
+
+  @Override
+  public Set<String> getComponentStreams(String componentId) {
+    return this.delegate.getComponentStreams(componentId);
+  }
+
+  @Override
+  public List<Integer> getComponentTasks(String componentId) {
+    return this.delegate.getComponentTasks(componentId);
+  }
+
+  @Override
+  public Fields getComponentOutputFields(String componentId, String streamId) {
+    return this.delegate.getComponentOutputFields(componentId, streamId);
+  }
+
+  @Override
+  public Map<TopologyAPI.StreamId, TopologyAPI.Grouping> getSources(String componentId) {
+    return this.delegate.getSources(componentId);
+  }
+
+  @Override
+  public Map<String, Map<String, TopologyAPI.Grouping>> getTargets(String componentId) {
+    return this.delegate.getTargets(componentId);
+  }
+
+  @Override
+  public Map<Integer, String> getTaskToComponent() {
+    return this.delegate.getTaskToComponent();
+  }
+
+  @Override
+  public Set<String> getComponentIds() {
+    return this.delegate.getComponentIds();
+  }
+
+  @Override
+  public int maxTopologyMessageTimeout() {
+    return this.delegate.maxTopologyMessageTimeout();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.json
deleted file mode 100644
index 7e04cfc..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.json
deleted file mode 100644
index 7e04cfc..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.json
deleted file mode 100644
index b822614..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.json
deleted file mode 100644
index a3cceb7..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B","A", "B", "A", "B", "A", "B", "A", "B", "A", "B","A", "B", "A", "B", "B", "A", "A", "A", "B", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.json
deleted file mode 100644
index b822614..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/all_grouping/AllGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/all_grouping/AllGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/all_grouping/AllGroupingResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/all_grouping/AllGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/BasicTopologyOneTask.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/basic_topology_one_task/BasicTopologyOneTask.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/BasicTopologyOneTask.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/basic_topology_one_task/BasicTopologyOneTask.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/BasicTopologyOneTask.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/basic_topology_one_task/BasicTopologyOneTaskResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/BasicTopologyOneTask.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/basic_topology_one_task/BasicTopologyOneTaskResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/bolt_double_emit_tuples/BoltDoubleEmitTuples.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/bolt_double_emit_tuples/BoltDoubleEmitTuples.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/bolt_double_emit_tuples/BoltDoubleEmitTuplesResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/bolt_double_emit_tuples/BoltDoubleEmitTuplesResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/FieldsGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/fields_grouping/FieldsGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/FieldsGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/fields_grouping/FieldsGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/FieldsGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/fields_grouping/FieldsGroupingResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/FieldsGrouping.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/fields_grouping/FieldsGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/GlobalGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/global_grouping/GlobalGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/GlobalGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/global_grouping/GlobalGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/GlobalGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/global_grouping/GlobalGroupingResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/GlobalGrouping.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/global_grouping/GlobalGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/MultiSpoutsMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/multi_spouts_multi_tasks/MultiSpoutsMultiTasks.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/MultiSpoutsMultiTasks.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/multi_spouts_multi_tasks/MultiSpoutsMultiTasks.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/MultiSpoutsMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/multi_spouts_multi_tasks/MultiSpoutsMultiTasksResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/MultiSpoutsMultiTasks.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/multi_spouts_multi_tasks/MultiSpoutsMultiTasksResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/non_grouping/NonGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/non_grouping/NonGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/non_grouping/NonGroupingResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/non_grouping/NonGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasks.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasks.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasksResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasksResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasks.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasks.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasksResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasksResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_multi_tasks/OneSpoutMultiTasks.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_multi_tasks/OneSpoutMultiTasks.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_multi_tasks/OneSpoutMultiTasksResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_multi_tasks/OneSpoutMultiTasksResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_two_bolts/OneSpoutTwoBolts.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_two_bolts/OneSpoutTwoBolts.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_two_bolts/OneSpoutTwoBoltsResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_two_bolts/OneSpoutTwoBoltsResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/CustomCheckBolt.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomCheckBolt.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/CustomCheckBolt.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomCheckBolt.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/CustomObject.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomObject.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/CustomObject.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomObject.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/CustomSpout.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomSpout.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/CustomSpout.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomSpout.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/SerializationTopology.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/SerializationTopology.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/SerializationTopology.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/SerializationTopology.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/SerializationTopology.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/SerializationTopologyResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/SerializationTopology.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/SerializationTopologyResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/shuffle_grouping/ShuffleGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/shuffle_grouping/ShuffleGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/shuffle_grouping/ShuffleGroupingResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/shuffle_grouping/ShuffleGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/WindowTestBase.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/WindowTestBase.java
new file mode 100644
index 0000000..8647bb1
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/WindowTestBase.java
@@ -0,0 +1,260 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing;
+
+import java.net.MalformedURLException;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.logging.Logger;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.utils.Utils;
+import com.twitter.heron.api.windowing.TupleWindow;
+import com.twitter.heron.integration_test.common.AbstractTestTopology;
+import com.twitter.heron.integration_test.core.TestTopologyBuilder;
+
+public class WindowTestBase extends AbstractTestTopology {
+
+  private static final String NUMBER_FIELD = "number";
+  private static final String STRING_FIELD = "numAsStr";
+  private static final String EVENT_TIME_FIELD = "eventTimeField";
+  private static final String DUMMY_FIELD = "dummy";
+
+  private BaseWindowedBolt.Count windowCountLength;
+  private BaseWindowedBolt.Count slideCountInterval;
+  private Duration windowDurationLength;
+  private Duration slideDurationInterval;
+  private Long sleepBetweenTuples;
+  private boolean useEventTime = false;
+  private Duration watermarkInterval;
+
+
+  public String getBoltName() {
+    return "VerificationBolt";
+  }
+
+  public String getSpoutName() {
+    return "IncrementingSpout";
+  }
+
+
+  public WindowTestBase(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public WindowTestBase withWindowLength(Duration duration) {
+    this.windowDurationLength = duration;
+    return this;
+  }
+
+  public WindowTestBase withWindowLength(BaseWindowedBolt.Count count) {
+    this.windowCountLength = count;
+    return this;
+  }
+
+  public WindowTestBase withSlidingInterval(Duration duration) {
+    this.slideDurationInterval = duration;
+    return this;
+  }
+
+  public WindowTestBase withSlidingInterval(BaseWindowedBolt.Count count) {
+    this.slideCountInterval = count;
+    return this;
+  }
+
+  public WindowTestBase withSleepBetweenTuples(long millis) {
+    this.sleepBetweenTuples = millis;
+    return this;
+  }
+
+  public WindowTestBase useEventTime() {
+    this.useEventTime = true;
+    return this;
+  }
+
+  @SuppressWarnings("HiddenField")
+  public WindowTestBase withWatermarkInterval(Duration watermarkInterval) {
+    this.watermarkInterval = watermarkInterval;
+    return this;
+  }
+
+  @Override
+  protected TestTopologyBuilder buildTopology(TestTopologyBuilder builder) {
+    builder.setSpout(getSpoutName(), new IncrementingSpout()
+        .withSleepBetweenTuples(sleepBetweenTuples), 1);
+
+    BaseWindowedBolt bolt = null;
+    if (this.windowCountLength != null) {
+      if (this.slideCountInterval != null) {
+        bolt = new VerificationBolt()
+            .withWindow(this.windowCountLength, this.slideCountInterval);
+      } else {
+        bolt = new VerificationBolt()
+            .withTumblingWindow(this.windowCountLength);
+      }
+    }
+
+    if (this.windowDurationLength != null) {
+      if (this.slideDurationInterval != null) {
+        bolt = new VerificationBolt()
+            .withWindow(this.windowDurationLength, this.slideDurationInterval);
+      } else {
+        bolt = new VerificationBolt()
+            .withTumblingWindow(this.windowDurationLength);
+      }
+    }
+
+    if (this.useEventTime) {
+      bolt.withTimestampField(EVENT_TIME_FIELD);
+    }
+
+    if (this.watermarkInterval != null) {
+      bolt.withWatermarkInterval(this.watermarkInterval);
+    }
+    builder.setBolt(getBoltName(), bolt, 1, false)
+        .shuffleGrouping(getSpoutName());
+    return builder;
+  }
+
+  public static class IncrementingSpout extends BaseRichSpout {
+    private static final Logger LOG = Logger.getLogger(IncrementingSpout.class.getName());
+    private static final long serialVersionUID = -6171170228097868632L;
+    private SpoutOutputCollector collector;
+    private static int currentNum;
+    private static Random rng = new Random();
+    private String componentId;
+    private Long sleepBetweenTuples = null;
+    // In millis
+    private long currentTime = 1504573536000L;
+
+    public IncrementingSpout withSleepBetweenTuples(long millis) {
+      this.sleepBetweenTuples = millis;
+      return this;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD, EVENT_TIME_FIELD));
+    }
+
+    @Override
+    @SuppressWarnings("HiddenField")
+    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector
+        collector) {
+      componentId = context.getThisComponentId();
+      this.collector = collector;
+    }
+
+    @Override
+    public void nextTuple() {
+      if (sleepBetweenTuples == null) {
+        Utils.sleep(rng.nextInt(10));
+      } else {
+        Utils.sleep(sleepBetweenTuples);
+      }
+      currentNum++;
+      final String numAsStr = "str(" + currentNum + ")str";
+      final Values tuple = new Values(currentNum, numAsStr, currentTime += 1000);
+      LOG.info("Time = " + System.currentTimeMillis()
+          + " Component = " + componentId + " Tuple = " + tuple.toString());
+
+      collector.emit(tuple, currentNum);
+    }
+
+    @Override
+    public void ack(Object msgId) {
+      LOG.info("Received ACK for msgId : " + msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+      LOG.info("Received FAIL for msgId : " + msgId);
+    }
+  }
+
+  public static class VerificationBolt extends BaseWindowedBolt {
+    private static final long serialVersionUID = -6067634845003700125L;
+    private OutputCollector collector;
+    private String componentId;
+
+    private static int windowCount = 0;
+
+
+    private static final Logger LOG = Logger.getLogger(VerificationBolt.class.getName());
+
+
+    @Override
+    @SuppressWarnings("HiddenField")
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector
+        collector) {
+      componentId = context.getThisComponentId();
+      this.collector = collector;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void execute(TupleWindow inputWindow) {
+      windowCount++;
+      List<Tuple> tuplesInWindow = inputWindow.get();
+      List<Tuple> newTuples = inputWindow.getNew();
+      List<Tuple> expiredTuples = inputWindow.getExpired();
+      LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+      LOG.info("newTuples.size() = " + newTuples.size());
+      LOG.info("expiredTuples.size() = " + expiredTuples.size());
+
+      JSONObject jsonObject = new JSONObject();
+      JSONArray jsonArray = new JSONArray();
+      JSONObject tmp = new JSONObject();
+      tmp.put("tuplesInWindow", tuplesToListString(tuplesInWindow));
+      jsonArray.add(tmp);
+      tmp = new JSONObject();
+      tmp.put("newTuples", tuplesToListString(newTuples));
+      jsonArray.add(tmp);
+      tmp = new JSONObject();
+      tmp.put("expiredTuples", tuplesToListString(expiredTuples));
+      jsonArray.add(tmp);
+      jsonObject.put(windowCount, jsonArray);
+      LOG.info("Component = " + componentId + " Window Count = " + windowCount
+          + " tuplesInWindow = " + jsonArray.toJSONString());
+
+      collector.emit(new Values(jsonObject.toJSONString()));
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields(DUMMY_FIELD));
+    }
+  }
+
+  public static List<String> tuplesToListString(List<Tuple> tuples) {
+    List<String> tmp = new LinkedList<>();
+    for (Tuple tuple : tuples) {
+      tmp.add(tuple.getValue(0).toString());
+    }
+    return tmp;
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1.java
new file mode 100644
index 0000000..b77ec65
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1.java
@@ -0,0 +1,34 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingCountWindowTest1 extends WindowTestBase {
+
+  public SlidingCountWindowTest1(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    WindowTestBase topology = new SlidingCountWindowTest1(args)
+        .withSleepBetweenTuples(1000)
+        .withWindowLength(BaseWindowedBolt.Count.of(10))
+        .withSlidingInterval(BaseWindowedBolt.Count.of(5));
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1Results.json
new file mode 100644
index 0000000..e8680ad
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1Results.json
@@ -0,0 +1,4 @@
+[
+  "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\"]},{\"newTuples\":[\"1\",\"2\",\"3\",\"4\",\"5\"]},{\"expiredTuples\":[]}]}",
+  "{\"2\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\",\"10\"]},{\"newTuples\":[\"6\",\"7\",\"8\",\"9\",\"10\"]},{\"expiredTuples\":[]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2.java
new file mode 100644
index 0000000..b8bd7e6
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2.java
@@ -0,0 +1,34 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingCountWindowTest2 extends WindowTestBase {
+
+  public SlidingCountWindowTest2(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    WindowTestBase topology = new SlidingCountWindowTest2(args)
+        .withSleepBetweenTuples(1000)
+        .withWindowLength(BaseWindowedBolt.Count.of(2))
+        .withSlidingInterval(BaseWindowedBolt.Count.of(1));
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2Results.json
new file mode 100644
index 0000000..970469f
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2Results.json
@@ -0,0 +1,12 @@
+[
+  "{\"1\":[{\"tuplesInWindow\":[\"1\"]},{\"newTuples\":[\"1\"]},{\"expiredTuples\":[]}]}",
+  "{\"2\":[{\"tuplesInWindow\":[\"1\",\"2\"]},{\"newTuples\":[\"2\"]},{\"expiredTuples\":[]}]}",
+  "{\"3\":[{\"tuplesInWindow\":[\"2\",\"3\"]},{\"newTuples\":[\"3\"]},{\"expiredTuples\":[\"1\"]}]}",
+  "{\"4\":[{\"tuplesInWindow\":[\"3\",\"4\"]},{\"newTuples\":[\"4\"]},{\"expiredTuples\":[\"2\"]}]}",
+  "{\"5\":[{\"tuplesInWindow\":[\"4\",\"5\"]},{\"newTuples\":[\"5\"]},{\"expiredTuples\":[\"3\"]}]}",
+  "{\"6\":[{\"tuplesInWindow\":[\"5\",\"6\"]},{\"newTuples\":[\"6\"]},{\"expiredTuples\":[\"4\"]}]}",
+  "{\"7\":[{\"tuplesInWindow\":[\"6\",\"7\"]},{\"newTuples\":[\"7\"]},{\"expiredTuples\":[\"5\"]}]}",
+  "{\"8\":[{\"tuplesInWindow\":[\"7\",\"8\"]},{\"newTuples\":[\"8\"]},{\"expiredTuples\":[\"6\"]}]}",
+  "{\"9\":[{\"tuplesInWindow\":[\"8\",\"9\"]},{\"newTuples\":[\"9\"]},{\"expiredTuples\":[\"7\"]}]}",
+  "{\"10\":[{\"tuplesInWindow\":[\"9\",\"10\"]},{\"newTuples\":[\"10\"]},{\"expiredTuples\":[\"8\"]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3.java
new file mode 100644
index 0000000..1080c88
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3.java
@@ -0,0 +1,34 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingCountWindowTest3 extends WindowTestBase {
+
+  public SlidingCountWindowTest3(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    WindowTestBase topology = new SlidingCountWindowTest3(args)
+        .withSleepBetweenTuples(1000)
+        .withWindowLength(BaseWindowedBolt.Count.of(7))
+        .withSlidingInterval(BaseWindowedBolt.Count.of(3));
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3Results.json
new file mode 100644
index 0000000..975f01c
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3Results.json
@@ -0,0 +1,5 @@
+[
+  "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\"]},{\"newTuples\":[\"1\",\"2\",\"3\"]},{\"expiredTuples\":[]}]}",
+  "{\"2\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\"]},{\"newTuples\":[\"4\",\"5\",\"6\"]},{\"expiredTuples\":[]}]}",
+  "{\"3\":[{\"tuplesInWindow\":[\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\"]},{\"newTuples\":[\"7\",\"8\",\"9\"]},{\"expiredTuples\":[\"1\",\"2\"]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1.java
new file mode 100644
index 0000000..3f9e612
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1.java
@@ -0,0 +1,33 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class TumblingCountWindowTest1 extends WindowTestBase {
+
+  public TumblingCountWindowTest1(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    WindowTestBase topology = new TumblingCountWindowTest1(args)
+        .withSleepBetweenTuples(1000)
+        .withWindowLength(BaseWindowedBolt.Count.of(10));
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1Results.json
new file mode 100644
index 0000000..2416022
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1Results.json
@@ -0,0 +1,3 @@
+[
+  "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\",\"10\"]},{\"newTuples\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\",\"10\"]},{\"expiredTuples\":[]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2.java
new file mode 100644
index 0000000..15c80d6
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2.java
@@ -0,0 +1,33 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class TumblingCountWindowTest2 extends WindowTestBase {
+
+  public TumblingCountWindowTest2(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    WindowTestBase topology = new TumblingCountWindowTest2(args)
+        .withSleepBetweenTuples(1000)
+        .withWindowLength(BaseWindowedBolt.Count.of(2));
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2Results.json
new file mode 100644
index 0000000..e258923
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2Results.json
@@ -0,0 +1,7 @@
+[
+  "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\"]},{\"newTuples\":[\"1\",\"2\"]},{\"expiredTuples\":[]}]}",
+  "{\"2\":[{\"tuplesInWindow\":[\"3\",\"4\"]},{\"newTuples\":[\"3\",\"4\"]},{\"expiredTuples\":[\"1\",\"2\"]}]}",
+  "{\"3\":[{\"tuplesInWindow\":[\"5\",\"6\"]},{\"newTuples\":[\"5\",\"6\"]},{\"expiredTuples\":[\"3\",\"4\"]}]}",
+  "{\"4\":[{\"tuplesInWindow\":[\"7\",\"8\"]},{\"newTuples\":[\"7\",\"8\"]},{\"expiredTuples\":[\"5\",\"6\"]}]}",
+  "{\"5\":[{\"tuplesInWindow\":[\"9\",\"10\"]},{\"newTuples\":[\"9\",\"10\"]},{\"expiredTuples\":[\"7\",\"8\"]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3.java
new file mode 100644
index 0000000..227b1cc
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3.java
@@ -0,0 +1,33 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class TumblingCountWindowTest3 extends WindowTestBase {
+
+  public TumblingCountWindowTest3(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    WindowTestBase topology = new TumblingCountWindowTest3(args)
+        .withSleepBetweenTuples(1000)
+        .withWindowLength(BaseWindowedBolt.Count.of(7));
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3Results.json
new file mode 100644
index 0000000..8969a9a
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3Results.json
@@ -0,0 +1,3 @@
+[
+  "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\"]},{\"newTuples\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\"]},{\"expiredTuples\":[]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1.java
new file mode 100644
index 0000000..120fdea
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1.java
@@ -0,0 +1,32 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.time;
+
+import java.net.MalformedURLException;
+import java.time.Duration;
+
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingTimeWindowTest1 extends WindowTestBase {
+  public SlidingTimeWindowTest1(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    WindowTestBase topology = new SlidingTimeWindowTest1(args).withSleepBetweenTuples(1000)
+        .withWindowLength(Duration.ofMillis(1000))
+        .withSlidingInterval(Duration.ofMillis(1000));
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1Results.json
new file mode 100644
index 0000000..7af261c
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1Results.json
@@ -0,0 +1,12 @@
+[
+  "{\"1\":[{\"tuplesInWindow\":[\"1\"]},{\"newTuples\":[\"1\"]},{\"expiredTuples\":[]}]}",
+  "{\"10\":[{\"tuplesInWindow\":[\"10\"]},{\"newTuples\":[\"10\"]},{\"expiredTuples\":[\"9\"]}]}",
+  "{\"2\":[{\"tuplesInWindow\":[\"2\"]},{\"newTuples\":[\"2\"]},{\"expiredTuples\":[\"1\"]}]}",
+  "{\"3\":[{\"tuplesInWindow\":[\"3\"]},{\"newTuples\":[\"3\"]},{\"expiredTuples\":[\"2\"]}]}",
+  "{\"4\":[{\"tuplesInWindow\":[\"4\"]},{\"newTuples\":[\"4\"]},{\"expiredTuples\":[\"3\"]}]}",
+  "{\"5\":[{\"tuplesInWindow\":[\"5\"]},{\"newTuples\":[\"5\"]},{\"expiredTuples\":[\"4\"]}]}",
+  "{\"6\":[{\"tuplesInWindow\":[\"6\"]},{\"newTuples\":[\"6\"]},{\"expiredTuples\":[\"5\"]}]}",
+  "{\"7\":[{\"tuplesInWindow\":[\"7\"]},{\"newTuples\":[\"7\"]},{\"expiredTuples\":[\"6\"]}]}",
+  "{\"8\":[{\"tuplesInWindow\":[\"8\"]},{\"newTuples\":[\"8\"]},{\"expiredTuples\":[\"7\"]}]}",
+  "{\"9\":[{\"tuplesInWindow\":[\"9\"]},{\"newTuples\":[\"9\"]},{\"expiredTuples\":[\"8\"]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1.java
new file mode 100644
index 0000000..2077c3b
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1.java
@@ -0,0 +1,36 @@
+//  Copyright 2017 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.watermark;
+
+import java.net.MalformedURLException;
+import java.time.Duration;
+
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingWatermarkEventTimeWindowTest1 extends WindowTestBase {
+
+  public SlidingWatermarkEventTimeWindowTest1(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  public static void main(String[] args) throws Exception {
+    WindowTestBase topology = new SlidingWatermarkEventTimeWindowTest1(args)
+        .withSleepBetweenTuples(1000)
+        .useEventTime()
+        .withWatermarkInterval(Duration.ofMillis(1000))
+        .withWindowLength(Duration.ofMillis(1000))
+        .withSlidingInterval(Duration.ofMillis(1000));
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1Results.json
new file mode 100644
index 0000000..7af261c
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1Results.json
@@ -0,0 +1,12 @@
+[
+  "{\"1\":[{\"tuplesInWindow\":[\"1\"]},{\"newTuples\":[\"1\"]},{\"expiredTuples\":[]}]}",
+  "{\"10\":[{\"tuplesInWindow\":[\"10\"]},{\"newTuples\":[\"10\"]},{\"expiredTuples\":[\"9\"]}]}",
+  "{\"2\":[{\"tuplesInWindow\":[\"2\"]},{\"newTuples\":[\"2\"]},{\"expiredTuples\":[\"1\"]}]}",
+  "{\"3\":[{\"tuplesInWindow\":[\"3\"]},{\"newTuples\":[\"3\"]},{\"expiredTuples\":[\"2\"]}]}",
+  "{\"4\":[{\"tuplesInWindow\":[\"4\"]},{\"newTuples\":[\"4\"]},{\"expiredTuples\":[\"3\"]}]}",
+  "{\"5\":[{\"tuplesInWindow\":[\"5\"]},{\"newTuples\":[\"5\"]},{\"expiredTuples\":[\"4\"]}]}",
+  "{\"6\":[{\"tuplesInWindow\":[\"6\"]},{\"newTuples\":[\"6\"]},{\"expiredTuples\":[\"5\"]}]}",
+  "{\"7\":[{\"tuplesInWindow\":[\"7\"]},{\"newTuples\":[\"7\"]},{\"expiredTuples\":[\"6\"]}]}",
+  "{\"8\":[{\"tuplesInWindow\":[\"8\"]},{\"newTuples\":[\"8\"]},{\"expiredTuples\":[\"7\"]}]}",
+  "{\"9\":[{\"tuplesInWindow\":[\"9\"]},{\"newTuples\":[\"9\"]},{\"expiredTuples\":[\"8\"]}]}"
+]
diff --git a/integration_test/src/python/__init__.py b/integration_test/src/python/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/__init__.py b/integration_test/src/python/integration_test/topology/__init__.py
index 53d0931..8b13789 100644
--- a/integration_test/src/python/integration_test/topology/__init__.py
+++ b/integration_test/src/python/integration_test/topology/__init__.py
@@ -1,19 +1 @@
-"""Integration test topologies"""
-__all__ = [
-    'all_grouping',
-    'basic_one_task'
-    'bolt_double_emit_tuples',
-    'fields_grouping',
-    'global_grouping',
-    'multi_spouts_multi_tasks',
-    'none_grouping',
-    'one_bolt_multi_tasks',
-    'one_spout_bolt_multi_tasks',
-    'one_spout_multi_tasks',
-    'one_spout_two_bolts',
-    'shuffle_grouping',
-]
 
-# import some core stuff
-import integration_test.src.python.integration_test.common as common
-import integration_test.src.python.integration_test.core as core
diff --git a/integration_test/src/python/integration_test/topology/all_grouping/__init__.py b/integration_test/src/python/integration_test/topology/all_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/all_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/all_grouping.py b/integration_test/src/python/integration_test/topology/all_grouping/all_grouping.py
similarity index 95%
rename from integration_test/src/python/integration_test/topology/all_grouping.py
rename to integration_test/src/python/integration_test/topology/all_grouping/all_grouping.py
index fa96200..02f569f 100644
--- a/integration_test/src/python/integration_test/topology/all_grouping.py
+++ b/integration_test/src/python/integration_test/topology/all_grouping/all_grouping.py
@@ -19,7 +19,8 @@
 from integration_test.src.python.integration_test.common.bolt import IdentityBolt
 from integration_test.src.python.integration_test.common.spout import ABSpout
 
-def all_grouping_buidler(topology_name, http_server_url):
+
+def all_grouping_builder(topology_name, http_server_url):
   """Integration test topology builder for all grouping"""
   builder = TestTopologyBuilder(topology_name, http_server_url)
   ab_spout = builder.add_spout("ab-spout", ABSpout, 1)
diff --git a/integration_test/src/python/integration_test/topology/all_grouping_result.json b/integration_test/src/python/integration_test/topology/all_grouping/all_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/all_grouping_result.json
rename to integration_test/src/python/integration_test/topology/all_grouping/all_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/basic_one_task/__init__.py b/integration_test/src/python/integration_test/topology/basic_one_task/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/basic_one_task/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/basic_one_task.py b/integration_test/src/python/integration_test/topology/basic_one_task/basic_one_task.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/basic_one_task.py
rename to integration_test/src/python/integration_test/topology/basic_one_task/basic_one_task.py
diff --git a/integration_test/src/python/integration_test/topology/basic_one_task_result.json b/integration_test/src/python/integration_test/topology/basic_one_task/basic_one_task_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/basic_one_task_result.json
rename to integration_test/src/python/integration_test/topology/basic_one_task/basic_one_task_result.json
diff --git a/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/__init__.py b/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples.py b/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/bolt_double_emit_tuples.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/bolt_double_emit_tuples.py
rename to integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/bolt_double_emit_tuples.py
diff --git a/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples_result.json b/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/bolt_double_emit_tuples_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/bolt_double_emit_tuples_result.json
rename to integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/bolt_double_emit_tuples_result.json
diff --git a/integration_test/src/python/integration_test/topology/fields_grouping/__init__.py b/integration_test/src/python/integration_test/topology/fields_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/fields_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/fields_grouping.py b/integration_test/src/python/integration_test/topology/fields_grouping/fields_grouping.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/fields_grouping.py
rename to integration_test/src/python/integration_test/topology/fields_grouping/fields_grouping.py
diff --git a/integration_test/src/python/integration_test/topology/fields_grouping_result.json b/integration_test/src/python/integration_test/topology/fields_grouping/fields_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/fields_grouping_result.json
rename to integration_test/src/python/integration_test/topology/fields_grouping/fields_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/global_grouping/__init__.py b/integration_test/src/python/integration_test/topology/global_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/global_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/global_grouping.py b/integration_test/src/python/integration_test/topology/global_grouping/global_grouping.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/global_grouping.py
rename to integration_test/src/python/integration_test/topology/global_grouping/global_grouping.py
diff --git a/integration_test/src/python/integration_test/topology/global_grouping_result.json b/integration_test/src/python/integration_test/topology/global_grouping/global_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/global_grouping_result.json
rename to integration_test/src/python/integration_test/topology/global_grouping/global_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/__init__.py b/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks.py b/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/multi_spouts_multi_tasks.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks.py
rename to integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/multi_spouts_multi_tasks.py
diff --git a/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks_result.json b/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/multi_spouts_multi_tasks_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks_result.json
rename to integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/multi_spouts_multi_tasks_result.json
diff --git a/integration_test/src/python/integration_test/topology/none_grouping/__init__.py b/integration_test/src/python/integration_test/topology/none_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/none_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/none_grouping.py b/integration_test/src/python/integration_test/topology/none_grouping/none_grouping.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/none_grouping.py
rename to integration_test/src/python/integration_test/topology/none_grouping/none_grouping.py
diff --git a/integration_test/src/python/integration_test/topology/none_grouping_result.json b/integration_test/src/python/integration_test/topology/none_grouping/none_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/none_grouping_result.json
rename to integration_test/src/python/integration_test/topology/none_grouping/none_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/__init__.py b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks.py b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_bolt_multi_tasks.py
rename to integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks.py
diff --git a/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks_result.json b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_bolt_multi_tasks_result.json
rename to integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks_result.json
diff --git a/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/__init__.py b/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks.py b/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks.py
rename to integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks_result.json b/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks_result.json
rename to integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks_result.json
diff --git a/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/__init__.py b/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_multi_tasks.py b/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/one_spout_multi_tasks.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_multi_tasks.py
rename to integration_test/src/python/integration_test/topology/one_spout_multi_tasks/one_spout_multi_tasks.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_multi_tasks_result.json b/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/one_spout_multi_tasks_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_multi_tasks_result.json
rename to integration_test/src/python/integration_test/topology/one_spout_multi_tasks/one_spout_multi_tasks_result.json
diff --git a/integration_test/src/python/integration_test/topology/one_spout_two_bolts/__init__.py b/integration_test/src/python/integration_test/topology/one_spout_two_bolts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/one_spout_two_bolts/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_two_bolts.py b/integration_test/src/python/integration_test/topology/one_spout_two_bolts/one_spout_two_bolts.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_two_bolts.py
rename to integration_test/src/python/integration_test/topology/one_spout_two_bolts/one_spout_two_bolts.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_two_bolts_result.json b/integration_test/src/python/integration_test/topology/one_spout_two_bolts/one_spout_two_bolts_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_two_bolts_result.json
rename to integration_test/src/python/integration_test/topology/one_spout_two_bolts/one_spout_two_bolts_result.json
diff --git a/integration_test/src/python/integration_test/topology/shuffle_grouping/__init__.py b/integration_test/src/python/integration_test/topology/shuffle_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/shuffle_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/shuffle_grouping.py b/integration_test/src/python/integration_test/topology/shuffle_grouping/shuffle_grouping.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/shuffle_grouping.py
rename to integration_test/src/python/integration_test/topology/shuffle_grouping/shuffle_grouping.py
diff --git a/integration_test/src/python/integration_test/topology/shuffle_grouping_result.json b/integration_test/src/python/integration_test/topology/shuffle_grouping/shuffle_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/shuffle_grouping_result.json
rename to integration_test/src/python/integration_test/topology/shuffle_grouping/shuffle_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/test_topology_main.py b/integration_test/src/python/integration_test/topology/test_topology_main.py
index 25a7964..08f587c 100644
--- a/integration_test/src/python/integration_test/topology/test_topology_main.py
+++ b/integration_test/src/python/integration_test/topology/test_topology_main.py
@@ -17,45 +17,45 @@
 import logging
 import sys
 
-from integration_test.src.python.integration_test.topology.all_grouping \
-  import all_grouping_buidler
+from integration_test.src.python.integration_test.topology.all_grouping.all_grouping \
+    import all_grouping_builder
 
-from integration_test.src.python.integration_test.topology.basic_one_task \
+from integration_test.src.python.integration_test.topology.basic_one_task.basic_one_task \
   import basic_one_task_builder
 
-from integration_test.src.python.integration_test.topology.bolt_double_emit_tuples \
-  import bolt_double_emit_tuples_builder
+from integration_test.src.python.integration_test.topology.bolt_double_emit_tuples\
+    .bolt_double_emit_tuples import bolt_double_emit_tuples_builder
 
-from integration_test.src.python.integration_test.topology.fields_grouping \
+from integration_test.src.python.integration_test.topology.fields_grouping.fields_grouping \
   import fields_grouping_builder
 
-from integration_test.src.python.integration_test.topology.global_grouping \
+from integration_test.src.python.integration_test.topology.global_grouping.global_grouping \
   import global_grouping_builder
 
-from integration_test.src.python.integration_test.topology.multi_spouts_multi_tasks \
-  import multi_spouts_multi_tasks_builder
+from integration_test.src.python.integration_test.topology.multi_spouts_multi_tasks\
+    .multi_spouts_multi_tasks import multi_spouts_multi_tasks_builder
 
-from integration_test.src.python.integration_test.topology.none_grouping \
+from integration_test.src.python.integration_test.topology.none_grouping.none_grouping \
   import none_grouping_builder
 
-from integration_test.src.python.integration_test.topology.one_bolt_multi_tasks \
-  import one_bolt_multi_tasks_builder
+from integration_test.src.python.integration_test.topology. one_bolt_multi_tasks\
+    .one_bolt_multi_tasks import one_bolt_multi_tasks_builder
 
-from integration_test.src.python.integration_test.topology.one_spout_bolt_multi_tasks \
-  import one_spout_bolt_multi_tasks_builder
+from integration_test.src.python.integration_test.topology.one_spout_bolt_multi_tasks\
+    .one_spout_bolt_multi_tasks import one_spout_bolt_multi_tasks_builder
 
-from integration_test.src.python.integration_test.topology.one_spout_multi_tasks \
-  import one_spout_multi_tasks_builder
+from integration_test.src.python.integration_test.topology.one_spout_multi_tasks\
+    .one_spout_multi_tasks import one_spout_multi_tasks_builder
 
-from integration_test.src.python.integration_test.topology.one_spout_two_bolts \
-  import one_spout_two_bolts_builder
+from integration_test.src.python.integration_test.topology.one_spout_two_bolts\
+    .one_spout_two_bolts  import one_spout_two_bolts_builder
 
-from integration_test.src.python.integration_test.topology.shuffle_grouping \
+from integration_test.src.python.integration_test.topology.shuffle_grouping.shuffle_grouping \
   import shuffle_grouping_builder
 
 TOPOLOGY_BUILDERS = {
     'Heron_IntegrationTest_BasicOneTask': basic_one_task_builder,
-    'Heron_IntegrationTest_AllGrouping': all_grouping_buidler,
+    'Heron_IntegrationTest_AllGrouping': all_grouping_builder,
     'Heron_IntegrationTest_NoneGrouping': none_grouping_builder,
     'Heron_IntegrationTest_OneBoltMultiTasks': one_bolt_multi_tasks_builder,
     'Heron_IntegrationTest_OneSpoutBoltMultiTasks': one_spout_bolt_multi_tasks_builder,
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index bed1d6b..8c1e15e 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -11,32 +11,32 @@
     {
       "topologyName" : "IntegrationTest_FieldsGrouping",
       "classPath"    : "fields_grouping.FieldsGrouping",
-      "expectedResultRelativePath" : "FieldsGrouping.json"
+      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json"
     },
     {
       "topologyName" : "IntegrationTest_AllGrouping",
       "classPath"    : "all_grouping.AllGrouping",
-      "expectedResultRelativePath" : "AllGrouping.json"
+      "expectedResultRelativePath" : "all_grouping/AllGroupingResults.json"
     },
     {
       "topologyName" : "IntegrationTest_GlobalGrouping",
       "classPath"    : "global_grouping.GlobalGrouping",
-      "expectedResultRelativePath" : "GlobalGrouping.json"
+      "expectedResultRelativePath" : "global_grouping/GlobalGroupingResults.json"
     },
     {
       "topologyName" : "IntegrationTest_NonGrouping",
       "classPath"    : "non_grouping.NonGrouping",
-      "expectedResultRelativePath" : "NonGrouping.json"
+      "expectedResultRelativePath" : "non_grouping/NonGroupingResults.json"
     },
     {
       "topologyName" : "IntegrationTest_ShuffleGrouping",
       "classPath"    : "shuffle_grouping.ShuffleGrouping",
-      "expectedResultRelativePath" : "ShuffleGrouping.json"
+      "expectedResultRelativePath" : "shuffle_grouping/ShuffleGroupingResults.json"
     },
     {
       "topologyName" : "IntegrationTest_BasicTopologyOneTask",
       "classPath"    : "basic_topology_one_task.BasicTopologyOneTask",
-      "expectedResultRelativePath" : "BasicTopologyOneTask.json"
+      "expectedResultRelativePath" : "basic_topology_one_task/BasicTopologyOneTaskResults.json"
     },
     {
       "topologyName" : "IntegrationTest_BasicTopologyOneTaskScaleUp",
@@ -55,104 +55,144 @@
     {
       "topologyName" : "IntegrationTest_BoltDoubleEmitTuples",
       "classPath"    : "bolt_double_emit_tuples.BoltDoubleEmitTuples",
-      "expectedResultRelativePath" : "BoltDoubleEmitTuples.json"
+      "expectedResultRelativePath" : "bolt_double_emit_tuples/BoltDoubleEmitTuplesResults.json"
     },
     {
       "topologyName" : "IntegrationTest_MultiSpoutsMultiTasks",
       "classPath"    : "multi_spouts_multi_tasks.MultiSpoutsMultiTasks",
-      "expectedResultRelativePath" : "MultiSpoutsMultiTasks.json"
+      "expectedResultRelativePath" : "multi_spouts_multi_tasks/MultiSpoutsMultiTasksResults.json"
     },
     {
       "topologyName" : "IntegrationTest_OneBoltMultiTasks",
       "classPath"    : "one_bolt_multi_tasks.OneBoltMultiTasks",
-      "expectedResultRelativePath" : "OneBoltMultiTasks.json"
+      "expectedResultRelativePath" : "one_bolt_multi_tasks/OneBoltMultiTasksResults.json"
     },
     {
-      "topologyName" : "IntegrationTest_OneSpoutBoltMultiTasks",		
-      "classPath"    : "one_spout_bolt_multi_tasks.OneSpoutBoltMultiTasks",		
-      "expectedResultRelativePath" : "OneSpoutBoltMultiTasks.json"		
-    },		
+      "topologyName" : "IntegrationTest_OneSpoutBoltMultiTasks",
+      "classPath"    : "one_spout_bolt_multi_tasks.OneSpoutBoltMultiTasks",
+      "expectedResultRelativePath" : "one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasksResults.json"
+    },
     {
       "topologyName" : "IntegrationTest_OneSpoutMultiTasks",
       "classPath"    : "one_spout_multi_tasks.OneSpoutMultiTasks",
-      "expectedResultRelativePath" : "OneSpoutMultiTasks.json"
+      "expectedResultRelativePath" : "one_spout_multi_tasks/OneSpoutMultiTasksResults.json"
     },
     {
       "topologyName" : "IntegrationTest_OneSpoutTwoBolts",
       "classPath"    : "one_spout_two_bolts.OneSpoutTwoBolts",
-      "expectedResultRelativePath" : "OneSpoutTwoBolts.json"
+      "expectedResultRelativePath" : "one_spout_two_bolts/OneSpoutTwoBoltsResults.json"
     },
     {
       "topologyName" : "IntegrationTest_CustomSerialization",
       "classPath"    : "serialization.SerializationTopology",
-      "expectedResultRelativePath" : "SerializationTopology.json"
+      "expectedResultRelativePath" : "serialization/SerializationTopologyResults.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_SlidingCountWindow1",
+      "classPath"    : "windowing.count.SlidingCountWindowTest1",
+      "expectedResultRelativePath" : "windowing/count/SlidingCountWindowTest1Results.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_SlidingCountWindow2",
+      "classPath"    : "windowing.count.SlidingCountWindowTest2",
+      "expectedResultRelativePath" : "windowing/count/SlidingCountWindowTest2Results.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_SlidingCountWindow3",
+      "classPath"    : "windowing.count.SlidingCountWindowTest3",
+      "expectedResultRelativePath" : "windowing/count/SlidingCountWindowTest3Results.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_TumblingCountWindow1",
+      "classPath"    : "windowing.count.TumblingCountWindowTest1",
+      "expectedResultRelativePath" : "windowing/count/TumblingCountWindowTest1Results.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_TumblingCountWindow2",
+      "classPath"    : "windowing.count.TumblingCountWindowTest2",
+      "expectedResultRelativePath" : "windowing/count/TumblingCountWindowTest2Results.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_TumblingCountWindow3",
+      "classPath"    : "windowing.count.TumblingCountWindowTest3",
+      "expectedResultRelativePath" : "windowing/count/TumblingCountWindowTest3Results.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_SlidingTimeWindow1",
+      "classPath"    : "windowing.time.SlidingTimeWindowTest1",
+      "expectedResultRelativePath" : "windowing/time/SlidingTimeWindowTest1Results.json"
+    },
+    {
+      "topologyName" : "IntegrationTest_SlidingWatermarkEventTimeWindow1",
+      "classPath"    : "windowing.watermark.SlidingWatermarkEventTimeWindowTest1",
+      "expectedResultRelativePath" : "windowing/watermark/SlidingWatermarkEventTimeWindowTest1Results.json"
     }
   ],
   "pythonTopologies": [
     {
       "topologyName" : "Heron_IntegrationTest_AllGrouping",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "all_grouping_result.json"
+      "expectedResultRelativePath" : "all_grouping/all_grouping_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_BasicOneTask",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "basic_one_task_result.json"
+      "expectedResultRelativePath" : "basic_one_task/basic_one_task_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_NoneGrouping",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "none_grouping_result.json"
+      "expectedResultRelativePath" : "none_grouping/none_grouping_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_OneBoltMultiTasks",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "one_bolt_multi_tasks_result.json"
+      "expectedResultRelativePath" : "one_bolt_multi_tasks/one_bolt_multi_tasks_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_OneSpoutBoltMultiTasks",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "one_spout_bolt_multi_tasks_result.json"
+      "expectedResultRelativePath" : "one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_ShuffleGrouping",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "shuffle_grouping_result.json"
+      "expectedResultRelativePath" : "shuffle_grouping/shuffle_grouping_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_OneSpoutTwoBolts",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "one_spout_two_bolts_result.json"
+      "expectedResultRelativePath" : "one_spout_two_bolts/one_spout_two_bolts_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_OneSpoutTwoBolts",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "one_spout_two_bolts_result.json"
+      "expectedResultRelativePath" : "one_spout_two_bolts/one_spout_two_bolts_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_OneSpoutMultiTasks",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "one_spout_multi_tasks_result.json"
+      "expectedResultRelativePath" : "one_spout_multi_tasks/one_spout_multi_tasks_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_MultiSpoutsMultiTasks",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "multi_spouts_multi_tasks_result.json"
+      "expectedResultRelativePath" : "multi_spouts_multi_tasks/multi_spouts_multi_tasks_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_FieldsGrouping",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "fields_grouping_result.json"
+      "expectedResultRelativePath" : "fields_grouping/fields_grouping_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_BoltDoubleEmitTuples",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "bolt_double_emit_tuples_result.json"
+      "expectedResultRelativePath" : "bolt_double_emit_tuples/bolt_double_emit_tuples_result.json"
     },
     {
       "topologyName" : "Heron_IntegrationTest_GlobalGrouping",
       "classPath"    : "-",
-      "expectedResultRelativePath" : "global_grouping_result.json"
+      "expectedResultRelativePath" : "global_grouping/global_grouping_result.json"
     }
   ]
 }
diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD
index 18fdfbe..fafc619 100644
--- a/scripts/packages/BUILD
+++ b/scripts/packages/BUILD
@@ -311,6 +311,7 @@
     srcs = [
         "//integration_test/src/java:test-data-files",
     ],
+    strip_prefix = '/integration_test/src/java/com/twitter/heron/integration_test/topology/'
 )
 
 pkg_tar(
@@ -319,6 +320,7 @@
     srcs = [
         "//integration_test/src/python/integration_test/topology:test-data-files",
     ],
+    strip_prefix = '/integration_test/src/python/integration_test/topology/'
 )
 
 pkg_tar(