Merge branch 'karthik/upgradepex' of https://github.com/twitter/heron into karthik/upgradepex
diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java b/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java
index a25722a..182a965 100644
--- a/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java
+++ b/heron/api/src/java/com/twitter/heron/api/bolt/BaseWindowedBolt.java
@@ -83,6 +83,9 @@
}
private BaseWindowedBolt withWindowLength(Count count) {
+ if (count == null) {
+ throw new IllegalArgumentException("Window length cannot be set null");
+ }
if (count.value <= 0) {
throw new IllegalArgumentException("Window length must be positive [" + count + "]");
}
@@ -91,7 +94,10 @@
}
private BaseWindowedBolt withWindowLength(Duration duration) {
- if (duration.isNegative()) {
+ if (duration == null) {
+ throw new IllegalArgumentException("Window length cannot be set null");
+ }
+ if (duration.isNegative() || duration.isZero()) {
throw new IllegalArgumentException("Window length must be positive [" + duration + "]");
}
@@ -101,6 +107,9 @@
}
private BaseWindowedBolt withSlidingInterval(Count count) {
+ if (count == null) {
+ throw new IllegalArgumentException("Sliding interval cannot be set null");
+ }
if (count.value <= 0) {
throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]");
}
@@ -109,7 +118,10 @@
}
private BaseWindowedBolt withSlidingInterval(Duration duration) {
- if (duration.isNegative()) {
+ if (duration == null) {
+ throw new IllegalArgumentException("Sliding interval cannot be set null");
+ }
+ if (duration.isNegative() || duration.isZero()) {
throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]");
}
windowConfiguration.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS,
@@ -194,8 +206,9 @@
}
/**
- * Specify a field in the tuple that represents the timestamp as a long value. If this
- * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+ * Specify a field in the tuple that represents the timestamp as a long value. The timestamp
+ * should also be in milliseconds. If this field is not present in the
+ * incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
@@ -210,6 +223,9 @@
*/
@SuppressWarnings("HiddenField")
public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+ if (timestampExtractor == null) {
+ throw new IllegalArgumentException("Timestamp extractor cannot be set to null");
+ }
if (this.timestampExtractor != null) {
throw new IllegalArgumentException(
"Window is already configured with a timestamp " + "extractor: " + timestampExtractor);
@@ -234,6 +250,9 @@
* @param streamId the name of the stream used to emit late tuples on
*/
public BaseWindowedBolt withLateTupleStream(String streamId) {
+ if (streamId == null) {
+ throw new IllegalArgumentException("Cannot set late tuple stream id to null");
+ }
windowConfiguration.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, streamId);
return this;
}
@@ -247,6 +266,12 @@
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration) {
+ if (duration == null) {
+ throw new IllegalArgumentException("Lag duration cannot be set null");
+ }
+ if (duration.isNegative() || duration.isZero()) {
+ throw new IllegalArgumentException("Lag duration must be positive [" + duration + "]");
+ }
windowConfiguration.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS,
duration.toMillis());
return this;
@@ -259,6 +284,12 @@
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+ if (interval == null) {
+ throw new IllegalArgumentException("Watermark interval cannot be set null");
+ }
+ if (interval.isNegative() || interval.isZero()) {
+ throw new IllegalArgumentException("Watermark interval must be positive [" + interval + "]");
+ }
windowConfiguration.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS,
interval.toMillis());
return this;
diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
index 2dd1e15..d027d89 100644
--- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
+++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
@@ -71,7 +71,7 @@
private transient EvictionPolicy<Tuple> evictionPolicy;
private transient Long windowLengthDurationMs;
// package level for unit tests
- private transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
+ protected transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
public WindowedBoltExecutor(IWindowedBolt bolt) {
this.bolt = bolt;
@@ -150,7 +150,6 @@
lifecycleListener, Map<String, Object>
topoConf, TopologyContext context, Collection<Event<Tuple>> queue) {
-
WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener, queue);
Count windowLengthCount = null;
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java
index fd71ba5..d60de33 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/WindowingConfigs.java
@@ -14,6 +14,7 @@
package com.twitter.heron.api.windowing;
import java.util.HashMap;
+import java.util.Map;
public class WindowingConfigs extends HashMap<String, Object> {
@@ -69,4 +70,62 @@
*/
public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS = "topology.bolts"
+ ".watermark.event.interval.ms";
+
+ public void setTopologyBoltsWindowLengthCount(long value) {
+ setTopologyBoltsWindowLengthCount(this, value);
+ }
+
+ public static void setTopologyBoltsWindowLengthCount(Map<String, Object> conf, long value) {
+ conf.put(TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, value);
+ }
+
+ public void setTopologyBoltsWindowLengthDurationMs(long value) {
+ setTopologyBoltsWindowLengthDurationMs(this, value);
+ }
+
+ public static void setTopologyBoltsWindowLengthDurationMs(Map<String, Object> conf, long value) {
+ conf.put(TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, value);
+ }
+
+ public void setTopologyBoltsSlidingIntervalCount(long value) {
+ setTopologyBoltsSlidingIntervalCount(this, value);
+ }
+
+ public static void setTopologyBoltsSlidingIntervalCount(Map<String, Object> conf, long value) {
+ conf.put(TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, value);
+ }
+
+ public void setTopologyBoltsSlidingIntervalDurationMs(long value) {
+ setTopologyBoltsSlidingIntervalDurationMs(this, value);
+ }
+
+ public static void setTopologyBoltsSlidingIntervalDurationMs(
+ Map<String, Object> conf, long value) {
+ conf.put(TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, value);
+ }
+
+ public void setTopologyBoltsLateTupleStream(String value) {
+ setTopologyBoltsLateTupleStream(this, value);
+ }
+
+ public static void setTopologyBoltsLateTupleStream(Map<String, Object> conf, String value) {
+ conf.put(TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, value);
+ }
+
+ public void setTopologyBoltsTupleTimestampMaxLagMs(long value) {
+ setTopologyBoltsTupleTimestampMaxLagMs(this, value);
+ }
+
+ public static void setTopologyBoltsTupleTimestampMaxLagMs(Map<String, Object> conf, long value) {
+ conf.put(TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, value);
+ }
+
+ public void setTopologyBoltsWatermarkEventIntervalMs(long value) {
+ setTopologyBoltsWatermarkEventIntervalMs(this, value);
+ }
+
+ public static void setTopologyBoltsWatermarkEventIntervalMs(
+ Map<String, Object> conf, long value) {
+ conf.put(TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, value);
+ }
}
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index 9919e11..fcff382 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -3,7 +3,9 @@
api_deps_files = [
"//heron/api/src/java:api-java",
+ "//heron/common/src/java:utils-java",
"//third_party/java:junit4",
+ "@org_mockito_mockito_all//jar"
]
api_deps_files = \
@@ -19,7 +21,9 @@
java_tests(
test_classes = [
"com.twitter.heron.api.windowing.WindowManagerTest",
- "com.twitter.heron.api.windowing.WaterMarkEventGeneratorTest"
+ "com.twitter.heron.api.windowing.WaterMarkEventGeneratorTest",
+ "com.twitter.heron.api.bolt.WindowedBoltExecutorTest",
+ "com.twitter.heron.api.bolt.BaseWindowedBoltTest"
],
runtime_deps = [ ":api-tests" ],
size = "small",
diff --git a/heron/api/tests/java/com/twitter/heron/api/bolt/BaseWindowedBoltTest.java b/heron/api/tests/java/com/twitter/heron/api/bolt/BaseWindowedBoltTest.java
new file mode 100644
index 0000000..222e7d0
--- /dev/null
+++ b/heron/api/tests/java/com/twitter/heron/api/bolt/BaseWindowedBoltTest.java
@@ -0,0 +1,276 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.api.bolt;
+
+import java.time.Duration;
+
+import org.junit.Test;
+
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.windowing.TupleWindow;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for {@link BaseWindowedBolt}
+ */
+public class BaseWindowedBoltTest {
+
+ public static class TestBolt extends BaseWindowedBolt {
+
+ private static final long serialVersionUID = -7224073487836212922L;
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+
+ }
+ }
+
+ @Test
+ public void testSettingSlidingCountWindow() {
+ final Object[][] args = new Object[][]{
+ {-1, 10},
+ {10, -1},
+ {0, 10},
+ {10, 0},
+ {0, 0},
+ {-1, -1},
+ {5, 10},
+ {1, 1},
+ {10, 5},
+ {100, 10},
+ {100, 100},
+ {200, 100},
+ {500, 100},
+ {null, null},
+ {null, 1},
+ {1, null},
+ {null, -1},
+ {-1, null}
+ };
+
+ for (Object[] arg : args) {
+ TopologyBuilder builder = new TopologyBuilder();
+ Object arg0 = arg[0];
+ Object arg1 = arg[1];
+ try {
+
+ BaseWindowedBolt.Count windowLengthCount = null;
+ if (arg0 != null) {
+ windowLengthCount = BaseWindowedBolt.Count.of((Integer) arg0);
+ }
+ BaseWindowedBolt.Count slidingIntervalCount = null;
+
+ if (arg1 != null) {
+ slidingIntervalCount = BaseWindowedBolt.Count.of((Integer) arg1);
+ }
+
+ builder.setBolt("testBolt", new TestBolt().withWindow(windowLengthCount,
+ slidingIntervalCount));
+ if (arg0 == null || arg1 == null) {
+ fail(String.format("Window length or sliding window length cannot be null -- "
+ + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+ }
+ if ((Integer) arg0 <= 0 || (Integer) arg1 <= 0) {
+ fail(String.format("Window length or sliding window length cannot be zero or less -- "
+ + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+ }
+ } catch (IllegalArgumentException e) {
+ if (arg0 != null && arg1 != null && (Integer) arg0 > 0 && (Integer) arg1 > 0) {
+ fail(String.format("Exception: %s thrown on valid input -- windowLengthCount: %s "
+ + "slidingIntervalCount: %s", e.getMessage(), arg0, arg1));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSettingSlidingTimeWindow() {
+ final Object[][] args = new Object[][]{
+ {-1L, 10L},
+ {10L, -1L},
+ {0L, 10L},
+ {10L, 0L},
+ {0L, 0L},
+ {-1L, -1L},
+ {5L, 10L},
+ {1L, 1L},
+ {10L, 5L},
+ {100L, 10L},
+ {100L, 100L},
+ {200L, 100L},
+ {500L, 100L},
+ {null, null},
+ {null, 1L},
+ {1L, null},
+ {null, -1L},
+ {-1L, null}
+ };
+
+ for (Object[] arg : args) {
+ TopologyBuilder builder = new TopologyBuilder();
+ Object arg0 = arg[0];
+ Object arg1 = arg[1];
+ try {
+
+ Duration windowLengthDuration = null;
+ if (arg0 != null) {
+ windowLengthDuration = Duration.ofMillis((Long) arg0);
+ }
+ Duration slidingIntervalDuration = null;
+
+ if (arg1 != null) {
+ slidingIntervalDuration = Duration.ofMillis((Long) arg1);
+ }
+
+ builder.setBolt("testBolt", new TestBolt().withWindow(windowLengthDuration,
+ slidingIntervalDuration));
+ if (arg0 == null || arg1 == null) {
+ fail(String.format("Window length or sliding window length cannot be null -- "
+ + "windowLengthDuration: %s slidingIntervalDuration: %s", arg0, arg1));
+ }
+ if ((Long) arg0 <= 0 || (Long) arg1 <= 0) {
+ fail(String.format("Window length or sliding window length cannot be zero or less -- "
+ + "windowLengthDuration: %s slidingIntervalDuration: %s", arg0, arg1));
+ }
+ } catch (IllegalArgumentException e) {
+ if (arg0 != null && arg1 != null && (Long) arg0 > 0 && (Long) arg1 > 0) {
+ fail(String.format("Exception: %s thrown on valid input -- windowLengthDuration: %s "
+ + "slidingIntervalDuration: %s", e.getMessage(), arg0, arg1));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSettingTumblingCountWindow() {
+ final Object[] args = new Object[] {-1, 0, 1, 2, 5, 10, null};
+
+ for (Object arg : args) {
+ TopologyBuilder builder = new TopologyBuilder();
+ Object arg0 = arg;
+ try {
+
+ BaseWindowedBolt.Count windowLengthCount = null;
+ if (arg0 != null) {
+ windowLengthCount = BaseWindowedBolt.Count.of((Integer) arg0);
+ }
+
+ builder.setBolt("testBolt", new TestBolt().withTumblingWindow(windowLengthCount));
+ if (arg0 == null) {
+ fail(String.format("Window length cannot be null -- windowLengthCount: %s", arg0));
+ }
+ if ((Integer) arg0 <= 0) {
+ fail(String.format("Window length cannot be zero or less -- windowLengthCount: %s",
+ arg0));
+ }
+ } catch (IllegalArgumentException e) {
+ if (arg0 != null && (Integer) arg0 > 0) {
+ fail(String.format("Exception: %s thrown on valid input -- windowLengthCount: %s", e
+ .getMessage(), arg0));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSettingTumblingTimeWindow() {
+ final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+ for (Object arg : args) {
+ TopologyBuilder builder = new TopologyBuilder();
+ Object arg0 = arg;
+ try {
+
+ Duration windowLengthDuration = null;
+ if (arg0 != null) {
+ windowLengthDuration = Duration.ofMillis((Long) arg0);
+ }
+
+ builder.setBolt("testBolt", new TestBolt().withTumblingWindow(windowLengthDuration));
+ if (arg0 == null) {
+ fail(String.format("Window count duration cannot be null -- windowLengthDuration: %s",
+ arg0));
+ }
+ if ((Long) arg0 <= 0) {
+ fail(String.format("Window length cannot be zero or less -- windowLengthDuration: %s",
+ arg0));
+ }
+ } catch (IllegalArgumentException e) {
+ if (arg0 != null && (Long) arg0 > 0) {
+ fail(String.format("Exception: %s thrown on valid input -- windowLengthDuration: %s", e
+ .getMessage(), arg0));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSettingLagTime() {
+ final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+ for (Object arg : args) {
+ TopologyBuilder builder = new TopologyBuilder();
+ Object arg0 = arg;
+ try {
+
+ Duration lagTime = null;
+ if (arg0 != null) {
+ lagTime = Duration.ofMillis((Long) arg0);
+ }
+
+ builder.setBolt("testBolt", new TestBolt().withLag(lagTime));
+ if (arg0 == null) {
+ fail(String.format("Window lag duration cannot be null -- lagTime: %s", arg0));
+ }
+ if ((Long) arg0 <= 0) {
+ fail(String.format("Window lag cannot be zero or less -- lagTime: %s", arg0));
+ }
+ } catch (IllegalArgumentException e) {
+ if (arg0 != null && (Long) arg0 > 0) {
+ fail(String.format("Exception: %s thrown on valid input -- lagTime: %s",
+ e.getMessage(), arg0));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSettingWaterMarkInterval() {
+ final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+ for (Object arg : args) {
+ TopologyBuilder builder = new TopologyBuilder();
+ Object arg0 = arg;
+ try {
+
+ Duration watermarkInterval = null;
+ if (arg0 != null) {
+ watermarkInterval = Duration.ofMillis((Long) arg0);
+ }
+
+ builder.setBolt("testBolt", new TestBolt().withWatermarkInterval(watermarkInterval));
+ if (arg0 == null) {
+ fail(String.format("Watermark interval cannot be null -- watermarkInterval: %s", arg0));
+ }
+ if ((Long) arg0 <= 0) {
+ fail(String.format("Watermark interval cannot be zero or less -- watermarkInterval: "
+ + "%s", arg0));
+ }
+ } catch (IllegalArgumentException e) {
+ if (arg0 != null && (Long) arg0 > 0) {
+ fail(String.format("Exception: %s thrown on valid input -- watermarkInterval: %s", e
+ .getMessage(), arg0));
+ }
+ }
+ }
+ }
+}
diff --git a/heron/api/tests/java/com/twitter/heron/api/bolt/WindowedBoltExecutorTest.java b/heron/api/tests/java/com/twitter/heron/api/bolt/WindowedBoltExecutorTest.java
new file mode 100644
index 0000000..52a21d3
--- /dev/null
+++ b/heron/api/tests/java/com/twitter/heron/api/bolt/WindowedBoltExecutorTest.java
@@ -0,0 +1,241 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.api.bolt;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.windowing.TupleWindow;
+import com.twitter.heron.api.windowing.WindowingConfigs;
+import com.twitter.heron.common.utils.topology.TopologyContextImpl;
+import com.twitter.heron.common.utils.tuple.TupleImpl;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for {@link WindowedBoltExecutor}
+ */
+public class WindowedBoltExecutorTest {
+
+ private WindowedBoltExecutor executor;
+ private TestWindowedBolt testWindowedBolt;
+
+ @SuppressWarnings("VisibilityModifier")
+ private static class TestWindowedBolt extends BaseWindowedBolt {
+ private static final long serialVersionUID = -8934326157586387333L;
+ List<TupleWindow> tupleWindows = new ArrayList<>();
+
+ @Override
+ public void execute(TupleWindow input) {
+ //System.out.println(input);
+ tupleWindows.add(input);
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private TopologyContext getContext(final Fields fields) {
+ TopologyBuilder builder = new TopologyBuilder();
+ return new TopologyContextImpl(new Config(),
+ builder.createTopology()
+ .setConfig(new Config())
+ .setName("test")
+ .setState(TopologyAPI.TopologyState.RUNNING)
+ .getTopology(),
+ new HashMap(), 1, null) {
+ @Override
+ public Fields getComponentOutputFields(String componentId, String streamId) {
+ return fields;
+ }
+
+ };
+ }
+
+ private Tuple getTuple(String streamId, final Fields fields, Values values) {
+
+ TopologyContext topologyContext = getContext(fields);
+ return new TupleImpl(topologyContext, TopologyAPI.StreamId.newBuilder()
+ .setId(streamId).setComponentName("s1")
+ .build(), 0, null, values, 1) {
+ @Override
+ public TopologyAPI.StreamId getSourceGlobalStreamId() {
+ return TopologyAPI.StreamId.newBuilder().setComponentName("s1").setId("default").build();
+ }
+ };
+ }
+
+ private OutputCollector getOutputCollector() {
+ return Mockito.mock(OutputCollector.class);
+ }
+
+ private TopologyContext getTopologyContext() {
+ TopologyContext context = Mockito.mock(TopologyContext.class);
+
+ Map<TopologyAPI.StreamId, TopologyAPI.Grouping> sources =
+ Collections.singletonMap(TopologyAPI.StreamId.newBuilder()
+ .setComponentName("s1").setId("default").build(), null);
+ Mockito.when(context.getThisSources()).thenReturn(sources);
+ return context;
+ }
+
+ @Before
+ public void setUp() {
+ testWindowedBolt = new TestWindowedBolt();
+ testWindowedBolt.withTimestampField("ts");
+ executor = new WindowedBoltExecutor(testWindowedBolt);
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
+ // trigger manually to avoid timing issues
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100000L);
+ executor.prepare(conf, getTopologyContext(), getOutputCollector());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExecuteWithoutTs() throws Exception {
+ executor.execute(getTuple("s1", new Fields("a"), new Values(1)));
+ }
+
+ @Test
+ public void testExecuteWithTs() throws Exception {
+ long[] timestamps = {603, 605, 607, 618, 626, 636};
+ for (long ts : timestamps) {
+ executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
+ }
+ //Thread.sleep(120);
+ executor.waterMarkEventGenerator.run();
+ //System.out.println(testWindowedBolt.tupleWindows);
+ assertEquals(3, testWindowedBolt.tupleWindows.size());
+ TupleWindow first = testWindowedBolt.tupleWindows.get(0);
+ assertArrayEquals(
+ new long[]{603, 605, 607},
+ new long[]{(long) first.get().get(0).getValue(0),
+ (long) first.get().get(1).getValue(0),
+ (long) first.get().get(2).getValue(0)});
+
+ TupleWindow second = testWindowedBolt.tupleWindows.get(1);
+ assertArrayEquals(
+ new long[]{603, 605, 607, 618},
+ new long[]{(long) second.get().get(0).getValue(0),
+ (long) second.get().get(1).getValue(0),
+ (long) second.get().get(2).getValue(0),
+ (long) second.get().get(3).getValue(0)});
+
+ TupleWindow third = testWindowedBolt.tupleWindows.get(2);
+ assertArrayEquals(new long[]{618, 626}, new long[]{(long) third.get().get(0).getValue(0),
+ (long) third.get().get(1).getValue(0)});
+ }
+
+ @Test
+ public void testPrepareLateTupleStreamWithoutTs() throws Exception {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10L);
+
+ testWindowedBolt = new TestWindowedBolt();
+ executor = new WindowedBoltExecutor(testWindowedBolt);
+ TopologyContext context = getTopologyContext();
+ // emulate the call of withLateTupleStream method
+ Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default",
+ "$late")));
+ try {
+ executor.prepare(conf, context, getOutputCollector());
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertEquals(e.getMessage(), "Late tuple stream can be defined only when specifying a "
+ + "timestamp field");
+ }
+ }
+
+ @Test
+ public void testPrepareLateTupleStreamWithoutBuilder() throws Exception {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10L);
+
+ testWindowedBolt = new TestWindowedBolt();
+ testWindowedBolt.withTimestampField("ts");
+ executor = new WindowedBoltExecutor(testWindowedBolt);
+ TopologyContext context = getTopologyContext();
+ try {
+ executor.prepare(conf, context, getOutputCollector());
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertEquals(e.getMessage(), "Stream for late tuples must be defined with the builder "
+ + "method withLateTupleStream");
+ }
+ }
+
+
+ @Test
+ public void testExecuteWithLateTupleStream() throws Exception {
+ testWindowedBolt = new TestWindowedBolt();
+ testWindowedBolt.withTimestampField("ts");
+ executor = new WindowedBoltExecutor(testWindowedBolt);
+ TopologyContext context = getTopologyContext();
+ Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default",
+ "$late")));
+
+ OutputCollector outputCollector = Mockito.mock(OutputCollector.class);
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
+ //Trigger manually to avoid timing issues
+ conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 1_000_000L);
+ executor.prepare(conf, context, outputCollector);
+
+ long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
+ List<Tuple> tuples = new ArrayList<>(timestamps.length);
+
+ for (long ts : timestamps) {
+ Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
+ tuples.add(tuple);
+ executor.execute(tuple);
+
+ //Update the watermark to this timestamp
+ executor.waterMarkEventGenerator.run();
+ }
+ System.out.println(testWindowedBolt.tupleWindows);
+ Tuple tuple = tuples.get(tuples.size() - 1);
+ Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
+ }
+}
diff --git a/heron/common/src/cpp/zookeeper/zkclient.cpp b/heron/common/src/cpp/zookeeper/zkclient.cpp
index 0fbfae6..6633520 100644
--- a/heron/common/src/cpp/zookeeper/zkclient.cpp
+++ b/heron/common/src/cpp/zookeeper/zkclient.cpp
@@ -136,32 +136,14 @@
}
void ZKClient::Init() {
- zkaction_responses_ = new PCQueue<CallBack*>();
- auto zkaction_response_cb = [this](EventLoop::Status status) {
- this->OnZkActionResponse(status);
- };
-
- if (pipe(pipers_) < 0) {
- LOG(FATAL) << "Pipe failed in ZKClient";
- }
- sp_int32 flags;
- if ((flags = fcntl(pipers_[0], F_GETFL, 0)) < 0 ||
- fcntl(pipers_[0], F_SETFL, flags | O_NONBLOCK) < 0 ||
- eventLoop_->registerForRead(pipers_[0], std::move(zkaction_response_cb), true) != 0) {
- LOG(FATAL) << "fcntl failed in ZKClient";
- }
+ piper_ = new Piper(eventLoop_);
zoo_deterministic_conn_order(0); // even distribution of clients on the server
InitZKHandle();
}
// Destructor.
ZKClient::~ZKClient() {
- if (eventLoop_) {
- CHECK_EQ(eventLoop_->unRegisterForRead(pipers_[0]), 0);
- }
- close(pipers_[0]);
- close(pipers_[1]);
- delete zkaction_responses_;
+ delete piper_;
zookeeper_close(zk_handle_);
}
@@ -350,55 +332,17 @@
}
}
-void ZKClient::SignalMainThread() {
- // This need not be protected by any mutex.
- // The os will take care of that.
- int rc = write(pipers_[1], "a", 1);
- if (rc != 1) {
- LOG(FATAL) << "Write to pipe failed in ZkClient with return code: " << rc;
- }
-}
-
-void ZKClient::OnZkActionResponse(EventLoop::Status _status) {
- if (_status == EventLoop::READ_EVENT) {
- char buf[1];
- ssize_t readcount = read(pipers_[0], buf, 1);
- if (readcount == 1) {
- bool dequeued = false;
- CallBack* cb = zkaction_responses_->trydequeue(dequeued);
- if (cb) {
- cb->Run();
- }
- } else {
- LOG(ERROR) << "In Server read from pipers returned " << readcount << " errno " << errno
- << std::endl;
- if (readcount < 0 && (errno == EAGAIN || errno == EINTR)) {
- // Never mind. we will try again
- return;
- } else {
- // We really don't know what to do here.
- // TODO(kramasamy): Figure out a way to get the hell out of here
- return;
- }
- }
- }
- return;
-}
-
void ZKClient::ZkActionCb(sp_int32 rc, VCallback<sp_int32> cb) {
- zkaction_responses_->enqueue(CreateCallback(&RunUserCb, rc, std::move(cb)));
- SignalMainThread();
+ piper_->ExecuteInEventLoop(std::bind(&RunUserCb, rc, std::move(cb)));
}
void ZKClient::ZkWatcherCb(VCallback<> cb) {
- zkaction_responses_->enqueue(CreateCallback(&RunWatcherCb, std::move(cb)));
- SignalMainThread();
+ piper_->ExecuteInEventLoop(std::bind(&RunWatcherCb, std::move(cb)));
}
void ZKClient::SendWatchEvent(const ZkWatchEvent& event) {
CHECK(client_global_watcher_cb_);
- zkaction_responses_->enqueue(CreateCallback(&RunWatchEventCb, client_global_watcher_cb_, event));
- SignalMainThread();
+ piper_->ExecuteInEventLoop(std::bind(&RunWatchEventCb, client_global_watcher_cb_, event));
}
const std::string ZKClient::state2String(sp_int32 _state) {
diff --git a/heron/common/src/cpp/zookeeper/zkclient.h b/heron/common/src/cpp/zookeeper/zkclient.h
index f076294..2ae6803 100644
--- a/heron/common/src/cpp/zookeeper/zkclient.h
+++ b/heron/common/src/cpp/zookeeper/zkclient.h
@@ -118,7 +118,6 @@
ZKClient()
: zk_handle_(NULL),
eventLoop_(NULL),
- zkaction_responses_(NULL),
client_global_watcher_cb_(VCallback<ZkWatchEvent>()) {}
private:
@@ -128,17 +127,10 @@
// The function that actually inits the handle
void InitZKHandle();
- // This is the function used to signal the main thread
- void SignalMainThread();
-
- // When the zk callback wants to wake the main thread, it uses the SignalMainThread function.
- // This function will get executed in the main thread.
- void OnZkActionResponse(EventLoop::Status _status);
-
// We wrap all user zk calls with this completion function
// This completion function runs in the context of the
- // zk completion thread. It basically appends to
- // zkaction_responses_ and calls SignalMainThread
+ // zk completion thread. It basically calls the piper
+ // to execute the cb in eventLoop thread
void ZkActionCb(sp_int32 rc, VCallback<sp_int32> cb);
// This is the watcher function that gets called
@@ -157,9 +149,8 @@
// We use libzookeeper_mt as our zk library. This means that
// zk callbacks are all executed in the context of a zk thread.
- // These pipers are how they communicate it accross to our thread
- sp_int32 pipers_[2];
- PCQueue<CallBack*>* zkaction_responses_;
+ // Piper are how they communicate it accross to our main thread
+ Piper* piper_;
// A callback to notify the clients of this class about global session events.
VCallback<ZkWatchEvent> client_global_watcher_cb_;
};
diff --git a/heron/common/src/java/com/twitter/heron/common/basics/NIOLooper.java b/heron/common/src/java/com/twitter/heron/common/basics/NIOLooper.java
index b5e175e..3b31eb1 100644
--- a/heron/common/src/java/com/twitter/heron/common/basics/NIOLooper.java
+++ b/heron/common/src/java/com/twitter/heron/common/basics/NIOLooper.java
@@ -208,6 +208,7 @@
int operation,
ISelectHandler callback)
throws ClosedChannelException {
+
SelectionKey key = channel.keyFor(selector);
if (key == null) {
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.cpp b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
index 09d9fbc..d7134bb 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
@@ -40,7 +40,8 @@
stmgr_id_(_stmgr_id),
recvd_stmgr_pplan_(NULL),
register_response_status(heron::proto::system::STMGR_DIDNT_REGISTER) {
- InstallMessageHandler(&DummyInstance::HandleInstanceResponse);
+ InstallResponseHandler(new heron::proto::stmgr::RegisterInstanceRequest(),
+ &DummyInstance::HandleInstanceResponse);
InstallMessageHandler(&DummyInstance::HandleTupleMessage);
InstallMessageHandler(&DummyInstance::HandleNewInstanceAssignmentMsg);
@@ -62,14 +63,18 @@
}
}
-void DummyInstance::HandleClose(NetworkErrorCode) {}
+void DummyInstance::HandleClose(NetworkErrorCode) {
+ AddTimer(retry_cb_, 100);
+}
heron::proto::system::StatusCode DummyInstance::GetRegisterResponseStatus() {
return register_response_status;
}
-void DummyInstance::HandleInstanceResponse(
- heron::proto::stmgr::RegisterInstanceResponse* _message) {
+void DummyInstance::HandleInstanceResponse(void*,
+ heron::proto::stmgr::RegisterInstanceResponse* _message,
+ NetworkErrorCode status) {
+ CHECK_EQ(status, OK);
if (_message->has_pplan()) {
if (recvd_stmgr_pplan_) {
delete recvd_stmgr_pplan_;
@@ -87,21 +92,19 @@
heron::proto::stmgr::NewInstanceAssignmentMessage*) {}
void DummyInstance::CreateAndSendInstanceRequest() {
- heron::proto::stmgr::RegisterInstanceRequest message;
- heron::proto::system::Instance* instance = message.mutable_instance();
+ auto request = new heron::proto::stmgr::RegisterInstanceRequest();
+ heron::proto::system::Instance* instance = request->mutable_instance();
instance->set_instance_id(instance_id_);
instance->set_stmgr_id(stmgr_id_);
instance->mutable_info()->set_task_id(task_id_);
instance->mutable_info()->set_component_index(component_index_);
instance->mutable_info()->set_component_name(component_name_);
- message.set_topology_name(topology_name_);
- message.set_topology_id(topology_id_);
- SendMessage(message);
+ request->set_topology_name(topology_name_);
+ request->set_topology_id(topology_id_);
+ SendRequest(request, nullptr);
return;
}
-void DummyInstance::CreateAndSendTupleMessages() {}
-
//////////////////////////////////////// DummySpoutInstance ////////////////////////////////////
DummySpoutInstance::DummySpoutInstance(EventLoopImpl* eventLoop, const NetworkOptions& _options,
const sp_string& _topology_name,
@@ -116,12 +119,8 @@
max_msgs_to_send_(max_msgs_to_send),
total_msgs_sent_(0),
batch_size_(1000),
- do_custom_grouping_(_do_custom_grouping) {}
-
-void DummySpoutInstance::HandleInstanceResponse(
- heron::proto::stmgr::RegisterInstanceResponse* _message) {
- DummyInstance::HandleInstanceResponse(_message);
-}
+ do_custom_grouping_(_do_custom_grouping),
+ under_backpressure_(false) {}
void DummySpoutInstance::HandleNewInstanceAssignmentMsg(
heron::proto::stmgr::NewInstanceAssignmentMessage* _msg) {
@@ -140,23 +139,25 @@
}
void DummySpoutInstance::CreateAndSendTupleMessages() {
- for (int i = 0; (i < batch_size_) && (total_msgs_sent_ < max_msgs_to_send_);
- ++total_msgs_sent_, ++i) {
- heron::proto::system::HeronTupleSet tuple_set;
- heron::proto::system::HeronDataTupleSet* data_set = tuple_set.mutable_data();
- heron::proto::api::StreamId* tstream = data_set->mutable_stream();
- tstream->set_id(stream_id_);
- tstream->set_component_name(component_name_);
- heron::proto::system::HeronDataTuple* tuple = data_set->add_tuples();
- tuple->set_key(0);
- // Add lots of data
- for (size_t i = 0; i < 500; ++i) *(tuple->add_values()) = "dummy data";
+ if (!under_backpressure_) {
+ for (int i = 0; (i < batch_size_) && (total_msgs_sent_ < max_msgs_to_send_);
+ ++total_msgs_sent_, ++i) {
+ heron::proto::system::HeronTupleSet tuple_set;
+ heron::proto::system::HeronDataTupleSet* data_set = tuple_set.mutable_data();
+ heron::proto::api::StreamId* tstream = data_set->mutable_stream();
+ tstream->set_id(stream_id_);
+ tstream->set_component_name(component_name_);
+ heron::proto::system::HeronDataTuple* tuple = data_set->add_tuples();
+ tuple->set_key(0);
+ // Add lots of data
+ for (size_t i = 0; i < 500; ++i) *(tuple->add_values()) = "dummy data";
- // Add custom grouping if need be
- if (do_custom_grouping_) {
- tuple->add_dest_task_ids(custom_grouping_dest_task_);
+ // Add custom grouping if need be
+ if (do_custom_grouping_) {
+ tuple->add_dest_task_ids(custom_grouping_dest_task_);
+ }
+ SendMessage(tuple_set);
}
- SendMessage(tuple_set);
}
if (total_msgs_sent_ != max_msgs_to_send_) {
AddTimer([this]() { this->CreateAndSendTupleMessages(); }, 1000);
@@ -175,11 +176,6 @@
expected_msgs_to_recv_(_expected_msgs_to_recv),
msgs_recvd_(0) {}
-void DummyBoltInstance::HandleInstanceResponse(
- heron::proto::stmgr::RegisterInstanceResponse* _message) {
- DummyInstance::HandleInstanceResponse(_message);
-}
-
void DummyBoltInstance::HandleTupleMessage(heron::proto::system::HeronTupleSet2* msg) {
if (msg->has_data()) msgs_recvd_ += msg->mutable_data()->tuples_size();
if (msgs_recvd_ >= expected_msgs_to_recv_) getEventLoop()->loopExit();
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.h b/heron/stmgr/tests/cpp/server/dummy_instance.h
index 9139d19..919413a 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.h
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.h
@@ -36,11 +36,11 @@
void Retry() { Start(); }
// Handle incoming message
- virtual void HandleInstanceResponse(heron::proto::stmgr::RegisterInstanceResponse* _message);
+ virtual void HandleInstanceResponse(void* ctx,
+ heron::proto::stmgr::RegisterInstanceResponse* _message,
+ NetworkErrorCode status);
// Handle incoming tuples
virtual void HandleTupleMessage(heron::proto::system::HeronTupleSet2* _message);
- // Send tuples
- virtual void CreateAndSendTupleMessages();
// Handle the instance assignment message
virtual void HandleNewInstanceAssignmentMsg(heron::proto::stmgr::NewInstanceAssignmentMessage*);
@@ -76,10 +76,15 @@
protected:
// Handle incoming message
- virtual void HandleInstanceResponse(heron::proto::stmgr::RegisterInstanceResponse* _message);
virtual void HandleNewInstanceAssignmentMsg(
heron::proto::stmgr::NewInstanceAssignmentMessage* _msg);
- virtual void CreateAndSendTupleMessages();
+ void CreateAndSendTupleMessages();
+ virtual void StartBackPressureConnectionCb(Connection* connection) {
+ under_backpressure_ = true;
+ }
+ virtual void StopBackPressureConnectionCb(Connection* _connection) {
+ under_backpressure_ = false;
+ }
private:
sp_string stream_id_;
@@ -87,6 +92,7 @@
sp_int32 total_msgs_sent_;
sp_int32 batch_size_;
bool do_custom_grouping_;
+ bool under_backpressure_;
// only valid when the above is true
sp_int32 custom_grouping_dest_task_;
};
@@ -103,7 +109,6 @@
protected:
// Handle incoming message
- virtual void HandleInstanceResponse(heron::proto::stmgr::RegisterInstanceResponse* _message);
// Handle incoming tuples
virtual void HandleTupleMessage(heron::proto::system::HeronTupleSet2* _message);
virtual void HandleNewInstanceAssignmentMsg(
diff --git a/heron/storm/src/java/BUILD b/heron/storm/src/java/BUILD
index a825c7e..91ae2d7 100644
--- a/heron/storm/src/java/BUILD
+++ b/heron/storm/src/java/BUILD
@@ -8,6 +8,7 @@
"//heron/api/src/java:api-java",
"//heron/common/src/java:basics-java",
"//heron/simulator/src/java:simulator-java",
+ "//heron/proto:proto_topology_java",
"@com_googlecode_json_simple_json_simple//jar",
"//third_party/java:kryo-neverlink",
]
diff --git a/heron/storm/src/java/org/apache/storm/Config.java b/heron/storm/src/java/org/apache/storm/Config.java
index e226a1c..25b781c 100644
--- a/heron/storm/src/java/org/apache/storm/Config.java
+++ b/heron/storm/src/java/org/apache/storm/Config.java
@@ -307,6 +307,55 @@
*/
public static final String TRANSACTIONAL_ZOOKEEPER_PORT = "transactional.zookeeper.port";
+ /*
+ * Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples
+ * in the window.
+ */
+ public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT
+ = "topology.bolts.window.length.count";
+
+ /*
+ * Bolt-specific configuration for windowed bolts to specify the window length in time duration.
+ */
+ public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS
+ = "topology.bolts.window.length.duration.ms";
+
+ /*
+ * Bolt-specific configuration for windowed bolts to specify the sliding interval as a count of number of tuples.
+ */
+ public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT
+ = "topology.bolts.window.sliding.interval.count";
+
+ /*
+ * Bolt-specific configuration for windowed bolts to specify the sliding interval in time duration.
+ */
+ public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS
+ = "topology.bolts.window.sliding.interval.duration.ms";
+
+ /**
+ * Bolt-specific configuration for windowed bolts to specify the name of the stream on which late tuples are
+ * going to be emitted. This configuration should only be used from the BaseWindowedBolt.withLateTupleStream builder
+ * method, and not as global parameter, otherwise IllegalArgumentException is going to be thrown.
+ */
+ public static final String TOPOLOGY_BOLTS_LATE_TUPLE_STREAM
+ = "topology.bolts.late.tuple.stream";
+
+ /**
+ * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp
+ * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.
+ * This config will be effective only if {@link org.apache.storm.windowing.TimestampExtractor} is specified.
+ */
+ public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS
+ = "topology.bolts.tuple.timestamp.max.lag.ms";
+
+ /*
+ * Bolt-specific configuration for windowed bolts to specify the time interval for generating
+ * watermark events. Watermark event tracks the progress of time when tuple timestamp is used.
+ * This config is effective only if {@link org.apache.storm.windowing.TimestampExtractor} is specified.
+ */
+ public static final String TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS
+ = "topology.bolts.watermark.event.interval.ms";
+
/**
* ---- DO NOT USE -----
* This variable is used to rewrite the TOPOLOGY_AUTO_TASK_HOOKS variable.
diff --git a/heron/storm/src/java/org/apache/storm/spout/ISpout.java b/heron/storm/src/java/org/apache/storm/spout/ISpout.java
index f0c2cb6..e2ee367 100644
--- a/heron/storm/src/java/org/apache/storm/spout/ISpout.java
+++ b/heron/storm/src/java/org/apache/storm/spout/ISpout.java
@@ -57,7 +57,7 @@
* @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
*/
@SuppressWarnings("rawtypes")
- void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
+ void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
/**
* Called when an ISpout is going to be shutdown. There is no guarentee that close
diff --git a/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java b/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
index b6a8ac7..6232e8a 100644
--- a/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
+++ b/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
@@ -45,7 +45,7 @@
@Override
@SuppressWarnings("rawtypes")
- public void open(Map conf, com.twitter.heron.api.topology.TopologyContext context,
+ public void open(Map<String, Object> conf, com.twitter.heron.api.topology.TopologyContext context,
SpoutOutputCollector collector) {
topologyContextImpl = new TopologyContext(context);
spoutOutputCollectorImpl = new SpoutOutputCollectorImpl(collector);
diff --git a/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java b/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java
new file mode 100644
index 0000000..d848c43
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/topology/IWindowedBolt.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import java.util.Map;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.windowing.TimestampExtractor;
+import org.apache.storm.windowing.TupleWindow;
+
+/**
+ * A bolt abstraction for supporting time and count based sliding & tumbling windows.
+ */
+public interface IWindowedBolt extends IComponent {
+ /**
+ * This is similar to the
+ * {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except
+ * that while emitting, the tuples are automatically anchored to the tuples in the inputWindow.
+ */
+ void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);
+
+ /**
+ * Process the tuple window and optionally emit new tuples based on the tuples in the input
+ * window.
+ */
+ void execute(TupleWindow inputWindow);
+
+ void cleanup();
+
+ /**
+ * Return a {@link TimestampExtractor} for extracting timestamps from a
+ * tuple for event time based processing, or null for processing time.
+ *
+ * @return the timestamp extractor
+ */
+ TimestampExtractor getTimestampExtractor();
+}
diff --git a/heron/storm/src/java/org/apache/storm/topology/IWindowedBoltDelegate.java b/heron/storm/src/java/org/apache/storm/topology/IWindowedBoltDelegate.java
new file mode 100644
index 0000000..46be02a
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/topology/IWindowedBoltDelegate.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import java.util.Map;
+
+import org.apache.storm.task.OutputCollectorImpl;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.windowing.TupleWindowImpl;
+
+public class IWindowedBoltDelegate implements com.twitter.heron.api.bolt.IWindowedBolt {
+
+ private static final long serialVersionUID = 8753943395082633132L;
+ private final IWindowedBolt delegate;
+ private TopologyContext topologyContextImpl;
+ private OutputCollectorImpl outputCollectorImpl;
+
+ public IWindowedBoltDelegate(IWindowedBolt iWindowedBolt) {
+ this.delegate = iWindowedBolt;
+ }
+
+ @Override
+ public void declareOutputFields(com.twitter.heron.api.topology.OutputFieldsDeclarer declarer) {
+ OutputFieldsGetter getter = new OutputFieldsGetter(declarer);
+ delegate.declareOutputFields(getter);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return this.delegate.getComponentConfiguration();
+ }
+
+ @Override
+ public void prepare(
+ Map<String, Object> conf,
+ com.twitter.heron.api.topology.TopologyContext context,
+ com.twitter.heron.api.bolt.OutputCollector collector) {
+ topologyContextImpl = new TopologyContext(context);
+ outputCollectorImpl = new OutputCollectorImpl(collector);
+ delegate.prepare(conf, topologyContextImpl, outputCollectorImpl);
+ }
+
+ @Override
+ public void execute(com.twitter.heron.api.windowing.TupleWindow inputWindow) {
+ this.delegate.execute(new TupleWindowImpl(inputWindow));
+ }
+
+ @Override
+ public void cleanup() {
+ this.delegate.cleanup();
+ }
+
+ @Override
+ public com.twitter.heron.api.windowing.TimestampExtractor getTimestampExtractor() {
+
+ return (this.delegate.getTimestampExtractor() == null) ? null
+ : new com.twitter.heron.api.windowing.TimestampExtractor() {
+ @Override
+ public long extractTimestamp(com.twitter.heron.api.tuple.Tuple tuple) {
+ return delegate.getTimestampExtractor().extractTimestamp(new TupleImpl(tuple));
+ }
+ };
+ }
+}
diff --git a/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java b/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java
index dd9ec5b..a9594a0 100644
--- a/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java
+++ b/heron/storm/src/java/org/apache/storm/topology/TopologyBuilder.java
@@ -24,6 +24,7 @@
import org.apache.storm.generated.StormTopology;
import com.twitter.heron.api.HeronTopology;
+import com.twitter.heron.api.bolt.WindowedBoltExecutor;
public class TopologyBuilder {
private com.twitter.heron.api.topology.TopologyBuilder delegate =
@@ -53,6 +54,19 @@
return setBolt(id, new BasicBoltExecutor(bolt), parallelismHint);
}
+ public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
+ return setBolt(id, bolt, null);
+ }
+
+ public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelismHint) throws
+ IllegalArgumentException {
+ return new BoltDeclarerImpl(
+ this.delegate.setBolt(
+ id, new WindowedBoltExecutor(new IWindowedBoltDelegate(bolt)),
+ parallelismHint)
+ );
+ }
+
public SpoutDeclarer setSpout(String id, IRichSpout spout) {
return setSpout(id, spout, null);
}
diff --git a/heron/storm/src/java/org/apache/storm/topology/TupleFieldTimestampExtractor.java b/heron/storm/src/java/org/apache/storm/topology/TupleFieldTimestampExtractor.java
new file mode 100644
index 0000000..9f3a045
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/topology/TupleFieldTimestampExtractor.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.TimestampExtractor;
+
+/**
+ * A {@link TimestampExtractor} that extracts timestamp from a specific field in the tuple.
+ */
+public final class TupleFieldTimestampExtractor implements TimestampExtractor {
+ private static final long serialVersionUID = 5212225232807940572L;
+ private final String fieldName;
+
+ private TupleFieldTimestampExtractor(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public long extractTimestamp(Tuple tuple) {
+ return tuple.getLongByField(fieldName);
+ }
+
+ public static TupleFieldTimestampExtractor of(String fieldName) {
+ return new TupleFieldTimestampExtractor(fieldName);
+ }
+
+ @Override
+ public String toString() {
+ return "TupleFieldTimestampExtractor{" + "fieldName='" + fieldName + '\'' + '}';
+ }
+}
diff --git a/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java b/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java
new file mode 100644
index 0000000..b0b5389
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/topology/base/BaseWindowedBolt.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology.base;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IWindowedBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TupleFieldTimestampExtractor;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.windowing.TimestampExtractor;
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+
+public abstract class BaseWindowedBolt implements IWindowedBolt {
+ private static final long serialVersionUID = -3998164228343123590L;
+ protected final transient com.twitter.heron.api.windowing.WindowingConfigs windowConfiguration;
+ protected com.twitter.heron.api.windowing.TimestampExtractor timestampExtractor;
+
+ /**
+ * Holds a count value for count based windows and sliding intervals.
+ */
+ public static class Count implements Serializable {
+ private static final long serialVersionUID = -2290882388716246812L;
+ public final int value;
+
+ public Count(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Returns a {@link Count} of given value.
+ *
+ * @param value the count value
+ * @return the Count
+ */
+ public static Count of(int value) {
+ return new Count(value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Count count = (Count) o;
+
+ return value == count.value;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "Count{" + "value=" + value + '}';
+ }
+ }
+
+ /**
+ * Holds a Time duration for time based windows and sliding intervals.
+ */
+ public static class Duration implements Serializable {
+ private static final long serialVersionUID = 5654070568075477148L;
+ public final int value;
+
+ public Duration(int value, TimeUnit timeUnit) {
+ this.value = (int) timeUnit.toMillis(value);
+ }
+
+ /**
+ * Returns a {@link Duration} corresponding to the the given value in milli seconds.
+ *
+ * @param milliseconds the duration in milliseconds
+ * @return the Duration
+ */
+ public static Duration of(int milliseconds) {
+ return new Duration(milliseconds, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Returns a {@link Duration} corresponding to the the given value in days.
+ *
+ * @param days the number of days
+ * @return the Duration
+ */
+ public static Duration days(int days) {
+ return new Duration(days, TimeUnit.DAYS);
+ }
+
+ /**
+ * Returns a {@link Duration} corresponding to the the given value in hours.
+ *
+ * @param hours the number of hours
+ * @return the Duration
+ */
+ public static Duration hours(int hours) {
+ return new Duration(hours, TimeUnit.HOURS);
+ }
+
+ /**
+ * Returns a {@link Duration} corresponding to the the given value in minutes.
+ *
+ * @param minutes the number of minutes
+ * @return the Duration
+ */
+ public static Duration minutes(int minutes) {
+ return new Duration(minutes, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Returns a {@link Duration} corresponding to the the given value in seconds.
+ *
+ * @param seconds the number of seconds
+ * @return the Duration
+ */
+ public static Duration seconds(int seconds) {
+ return new Duration(seconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Duration duration = (Duration) o;
+
+ return value == duration.value;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "Duration{" + "value=" + value + '}';
+ }
+ }
+
+ protected BaseWindowedBolt() {
+ windowConfiguration = new com.twitter.heron.api.windowing.WindowingConfigs();
+ }
+
+ private BaseWindowedBolt withWindowLength(Count count) {
+ if (count.value <= 0) {
+ throw new IllegalArgumentException("Window length must be positive [" + count + "]");
+ }
+ windowConfiguration.setTopologyBoltsWindowLengthCount(count.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withWindowLength(Duration duration) {
+ if (duration.value <= 0) {
+ throw new IllegalArgumentException("Window length must be positive [" + duration + "]");
+ }
+ windowConfiguration.setTopologyBoltsWindowLengthDurationMs(duration.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withSlidingInterval(Count count) {
+ if (count.value <= 0) {
+ throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]");
+ }
+ windowConfiguration.setTopologyBoltsSlidingIntervalCount(count.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withSlidingInterval(Duration duration) {
+ if (duration.value <= 0) {
+ throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]");
+ }
+ windowConfiguration.setTopologyBoltsSlidingIntervalDurationMs(duration.value);
+ return this;
+ }
+
+ /**
+ * Tuple count based sliding window configuration.
+ *
+ * @param windowLength the number of tuples in the window
+ * @param slidingInterval the number of tuples after which the window slides
+ */
+ public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
+ return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Tuple count and time duration based sliding window configuration.
+ *
+ * @param windowLength the number of tuples in the window
+ * @param slidingInterval the time duration after which the window slides
+ */
+ public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
+ return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Time duration and count based sliding window configuration.
+ *
+ * @param windowLength the time duration of the window
+ * @param slidingInterval the number of tuples after which the window slides
+ */
+ public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
+ return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Time duration based sliding window configuration.
+ *
+ * @param windowLength the time duration of the window
+ * @param slidingInterval the time duration after which the window slides
+ */
+ public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
+ return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * A tuple count based window that slides with every incoming tuple.
+ *
+ * @param windowLength the number of tuples in the window
+ */
+ public BaseWindowedBolt withWindow(Count windowLength) {
+ return withWindowLength(windowLength).withSlidingInterval(new Count(1));
+ }
+
+ /**
+ * A time duration based window that slides with every incoming tuple.
+ *
+ * @param windowLength the time duration of the window
+ */
+ public BaseWindowedBolt withWindow(Duration windowLength) {
+ return withWindowLength(windowLength).withSlidingInterval(new Count(1));
+ }
+
+ /**
+ * A count based tumbling window.
+ *
+ * @param count the number of tuples after which the window tumbles
+ */
+ public BaseWindowedBolt withTumblingWindow(Count count) {
+ return withWindowLength(count).withSlidingInterval(count);
+ }
+
+ /**
+ * A time duration based tumbling window.
+ *
+ * @param duration the time duration after which the window tumbles
+ */
+ public BaseWindowedBolt withTumblingWindow(Duration duration) {
+ return withWindowLength(duration).withSlidingInterval(duration);
+ }
+
+ /**
+ * Specify a field in the tuple that represents the timestamp as a long value. If this
+ * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+ *
+ * @param fieldName the name of the field that contains the timestamp
+ */
+ public BaseWindowedBolt withTimestampField(String fieldName) {
+ return withTimestampExtractor(TupleFieldTimestampExtractor.of(fieldName));
+ }
+
+ /**
+ * Specify the timestamp extractor implementation.
+ *
+ * @param timestampExtractor the {@link TimestampExtractor} implementation
+ */
+ @SuppressWarnings("HiddenField")
+ public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+ if (this.timestampExtractor != null) {
+ throw new IllegalArgumentException(
+ "Window is already configured with a timestamp extractor: " + timestampExtractor);
+ }
+
+ this.timestampExtractor = new com.twitter.heron.api.windowing.TimestampExtractor() {
+
+ @Override
+ public long extractTimestamp(Tuple tuple) {
+ return timestampExtractor.extractTimestamp(new TupleImpl(tuple));
+ }
+ };
+ return this;
+ }
+
+ @Override
+ public TimestampExtractor getTimestampExtractor() {
+ return (this.timestampExtractor == null) ? null : new TimestampExtractor() {
+
+ @Override
+ public long extractTimestamp(org.apache.storm.tuple.Tuple tuple) {
+
+ return timestampExtractor.extractTimestamp(new com.twitter.heron.api.tuple.Tuple() {
+
+ @Override
+ public int size() {
+ return tuple.size();
+ }
+
+ @Override
+ public int fieldIndex(String field) {
+ return tuple.fieldIndex(field);
+ }
+
+ @Override
+ public boolean contains(String field) {
+ return tuple.contains(field);
+ }
+
+ @Override
+ public Object getValue(int i) {
+ return tuple.getValue(i);
+ }
+
+ @Override
+ public String getString(int i) {
+ return tuple.getString(i);
+ }
+
+ @Override
+ public Integer getInteger(int i) {
+ return tuple.getInteger(i);
+ }
+
+ @Override
+ public Long getLong(int i) {
+ return tuple.getLong(i);
+ }
+
+ @Override
+ public Boolean getBoolean(int i) {
+ return tuple.getBoolean(i);
+ }
+
+ @Override
+ public Short getShort(int i) {
+ return tuple.getShort(i);
+ }
+
+ @Override
+ public Byte getByte(int i) {
+ return tuple.getByte(i);
+ }
+
+ @Override
+ public Double getDouble(int i) {
+ return tuple.getDouble(i);
+ }
+
+ @Override
+ public Float getFloat(int i) {
+ return tuple.getFloat(i);
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return tuple.getBinary(i);
+ }
+
+ @Override
+ public Object getValueByField(String field) {
+ return tuple.getValueByField(field);
+ }
+
+ @Override
+ public String getStringByField(String field) {
+ return tuple.getStringByField(field);
+ }
+
+ @Override
+ public Integer getIntegerByField(String field) {
+ return tuple.getIntegerByField(field);
+ }
+
+ @Override
+ public Long getLongByField(String field) {
+ return tuple.getLongByField(field);
+ }
+
+ @Override
+ public Boolean getBooleanByField(String field) {
+ return tuple.getBooleanByField(field);
+ }
+
+ @Override
+ public Short getShortByField(String field) {
+ return tuple.getShortByField(field);
+ }
+
+ @Override
+ public Byte getByteByField(String field) {
+ return tuple.getByteByField(field);
+ }
+
+ @Override
+ public Double getDoubleByField(String field) {
+ return tuple.getDoubleByField(field);
+ }
+
+ @Override
+ public Float getFloatByField(String field) {
+ return tuple.getFloatByField(field);
+ }
+
+ @Override
+ public byte[] getBinaryByField(String field) {
+ return tuple.getBinaryByField(field);
+ }
+
+ @Override
+ public List<Object> getValues() {
+ return tuple.getValues();
+ }
+
+ @Override
+ public Fields getFields() {
+ return new Fields(tuple.getFields().toList());
+ }
+
+ @Override
+ public List<Object> select(Fields selector) {
+ return tuple.select(new org.apache.storm.tuple.Fields(selector.toString()));
+ }
+
+ @Override
+ public TopologyAPI.StreamId getSourceGlobalStreamId() {
+ return TopologyAPI.StreamId.newBuilder().setId(tuple.getSourceStreamId())
+ .setComponentName(tuple.getSourceComponent()).build();
+ }
+
+ @Override
+ public String getSourceComponent() {
+ return tuple.getSourceComponent();
+ }
+
+ @Override
+ public int getSourceTask() {
+ return tuple.getSourceTask();
+ }
+
+ @Override
+ public String getSourceStreamId() {
+ return tuple.getSourceStreamId();
+ }
+
+ @Override
+ public void resetValues() {
+ tuple.resetValues();
+ }
+ });
+ }
+ };
+ }
+
+ /**
+ * Specify a stream id on which late tuples are going to be emitted.
+ * It must be defined on a per-component basis, and in conjunction with the
+ * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will
+ * be thrown.
+ *
+ * @param streamId the name of the stream used to emit late tuples on
+ */
+ public BaseWindowedBolt withLateTupleStream(String streamId) {
+ windowConfiguration.setTopologyBoltsLateTupleStream(streamId);
+ return this;
+ }
+
+
+ /**
+ * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple
+ * timestamps
+ * cannot be out of order by more than this amount.
+ *
+ * @param duration the max lag duration
+ */
+ public BaseWindowedBolt withLag(Duration duration) {
+ windowConfiguration.setTopologyBoltsTupleTimestampMaxLagMs(duration.value);
+ return this;
+ }
+
+ /**
+ * Specify the watermark event generation interval. For tuple based timestamps, watermark events
+ * are used to track the progress of time
+ *
+ * @param interval the interval at which watermark events are generated
+ */
+ public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+ windowConfiguration.setTopologyBoltsWatermarkEventIntervalMs(interval.value);
+ return this;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector
+ collector) {
+ // NOOP
+ }
+
+ @Override
+ public void cleanup() {
+ // NOOP
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // NOOP
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return windowConfiguration;
+ }
+}
diff --git a/heron/storm/src/java/org/apache/storm/tuple/Tuple.java b/heron/storm/src/java/org/apache/storm/tuple/Tuple.java
index 65eb6c8..c238346 100644
--- a/heron/storm/src/java/org/apache/storm/tuple/Tuple.java
+++ b/heron/storm/src/java/org/apache/storm/tuple/Tuple.java
@@ -39,110 +39,190 @@
int size();
/**
- * Returns the position of the specified field in this tuple.
- */
- int fieldIndex(String field);
-
- /**
* Returns true if this tuple contains the specified name of the field.
*/
boolean contains(String field);
/**
- * Gets the field at position i in the tuple. Returns object since tuples are dynamically typed.
- */
- Object getValue(int i);
-
- /**
- * Returns the String at position i in the tuple. If that field is not a String,
- * you will get a runtime error.
- */
- String getString(int i);
-
- /**
- * Returns the Integer at position i in the tuple. If that field is not an Integer,
- * you will get a runtime error.
- */
- Integer getInteger(int i);
-
- /**
- * Returns the Long at position i in the tuple. If that field is not a Long,
- * you will get a runtime error.
- */
- Long getLong(int i);
-
- /**
- * Returns the Boolean at position i in the tuple. If that field is not a Boolean,
- * you will get a runtime error.
- */
- Boolean getBoolean(int i);
-
- /**
- * Returns the Short at position i in the tuple. If that field is not a Short,
- * you will get a runtime error.
- */
- Short getShort(int i);
-
- /**
- * Returns the Byte at position i in the tuple. If that field is not a Byte,
- * you will get a runtime error.
- */
- Byte getByte(int i);
-
- /**
- * Returns the Double at position i in the tuple. If that field is not a Double,
- * you will get a runtime error.
- */
- Double getDouble(int i);
-
- /**
- * Returns the Float at position i in the tuple. If that field is not a Float,
- * you will get a runtime error.
- */
- Float getFloat(int i);
-
- /**
- * Returns the byte array at position i in the tuple. If that field is not a byte array,
- * you will get a runtime error.
- */
- byte[] getBinary(int i);
-
-
- Object getValueByField(String field);
-
- String getStringByField(String field);
-
- Integer getIntegerByField(String field);
-
- Long getLongByField(String field);
-
- Boolean getBooleanByField(String field);
-
- Short getShortByField(String field);
-
- Byte getByteByField(String field);
-
- Double getDoubleByField(String field);
-
- Float getFloatByField(String field);
-
- byte[] getBinaryByField(String field);
-
- /**
- * Gets all the values in this tuple.
- */
- List<Object> getValues();
-
- /**
* Gets the names of the fields in this tuple.
*/
Fields getFields();
/**
+ * Returns the position of the specified field in this tuple.
+ *
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ int fieldIndex(String field);
+
+ /**
* Returns a subset of the tuple based on the fields selector.
*/
List<Object> select(Fields selector);
+ /**
+ * Gets the field at position i in the tuple. Returns object since tuples are dynamically typed.
+ *
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ Object getValue(int i);
+
+ /**
+ * Returns the String at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a String
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ String getString(int i);
+
+ /**
+ * Returns the Integer at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a Integer
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ Integer getInteger(int i);
+
+ /**
+ * Returns the Long at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a Long
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ Long getLong(int i);
+
+ /**
+ * Returns the Boolean at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a Boolean
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ Boolean getBoolean(int i);
+
+ /**
+ * Returns the Short at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a Short
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ Short getShort(int i);
+
+ /**
+ * Returns the Byte at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a Byte
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ Byte getByte(int i);
+
+ /**
+ * Returns the Double at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a Double
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ Double getDouble(int i);
+
+ /**
+ * Returns the Float at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a Float
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ Float getFloat(int i);
+
+ /**
+ * Returns the byte array at position i in the tuple.
+ *
+ * @throws ClassCastException If that field is not a byte array
+ * @throws IndexOutOfBoundsException - if the index is out of range `(index < 0 || index >= size())`
+ */
+ byte[] getBinary(int i);
+
+ /**
+ * Gets the field with a specific name. Returns object since tuples are dynamically typed.
+ *
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ Object getValueByField(String field);
+
+ /**
+ * Gets the String field with a specific name.
+ *
+ * @throws ClassCastException If that field is not a String
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ String getStringByField(String field);
+
+ /**
+ * Gets the Integer field with a specific name.
+ *
+ * @throws ClassCastException If that field is not an Integer
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ Integer getIntegerByField(String field);
+
+ /**
+ * Gets the Long field with a specific name.
+ *
+ * @throws ClassCastException If that field is not a Long
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ Long getLongByField(String field);
+
+ /**
+ * Gets the Boolean field with a specific name.
+ *
+ * @throws ClassCastException If that field is not a Boolean
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ Boolean getBooleanByField(String field);
+
+ /**
+ * Gets the Short field with a specific name.
+ *
+ * @throws ClassCastException If that field is not a Short
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ Short getShortByField(String field);
+
+ /**
+ * Gets the Byte field with a specific name.
+ *
+ * @throws ClassCastException If that field is not a Byte
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ Byte getByteByField(String field);
+
+ /**
+ * Gets the Double field with a specific name.
+ *
+ * @throws ClassCastException If that field is not a Double
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ Double getDoubleByField(String field);
+
+ /**
+ * Gets the Float field with a specific name.
+ *
+ * @throws ClassCastException If that field is not a Float
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ Float getFloatByField(String field);
+
+ /**
+ * Gets the Byte array field with a specific name.
+ *
+ * @throws ClassCastException If that field is not a byte array
+ * @throws IllegalArgumentException - if field does not exist
+ */
+ byte[] getBinaryByField(String field);
+
+ /**
+ * Gets all the values in this tuple.
+ */
+ List<Object> getValues();
+
/**
* Returns the global stream id (component + stream) of this tuple.
diff --git a/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java b/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java
index 9c99768..30d4090 100644
--- a/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java
+++ b/heron/storm/src/java/org/apache/storm/tuple/TupleImpl.java
@@ -248,4 +248,20 @@
public void resetValues() {
delegate.resetValues();
}
+
+ @Override
+ public String toString() {
+ return "source: " + getSourceComponent() + ":" + getSourceTask()
+ + ", stream: " + getSourceStreamId() + ", " + getValues().toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return this == other;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(this);
+ }
}
diff --git a/heron/storm/src/java/org/apache/storm/windowing/TimestampExtractor.java b/heron/storm/src/java/org/apache/storm/windowing/TimestampExtractor.java
new file mode 100644
index 0000000..14c6054
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/windowing/TimestampExtractor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.windowing;
+
+import java.io.Serializable;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Interface to be implemented for extracting timestamp from a tuple.
+ */
+public interface TimestampExtractor extends Serializable {
+ /**
+ * Return the tuple timestamp indicating the time when the event happened.
+ *
+ * @param tuple the tuple
+ * @return the timestamp
+ */
+ long extractTimestamp(Tuple tuple);
+}
diff --git a/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java b/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java
new file mode 100644
index 0000000..0ed5810
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/windowing/TupleWindow.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * A {@link Window} that contains {@link Tuple} objects.
+ */
+public interface TupleWindow extends Window<Tuple> {
+}
diff --git a/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java b/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java
new file mode 100644
index 0000000..1d2f9e4
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/windowing/TupleWindowImpl.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.windowing;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+
+public class TupleWindowImpl implements TupleWindow {
+
+ private final com.twitter.heron.api.windowing.TupleWindow delegate;
+
+ public TupleWindowImpl(com.twitter.heron.api.windowing.TupleWindow tupleWindow) {
+ this.delegate = tupleWindow;
+ }
+
+ @Override
+ public List<Tuple> get() {
+ return convert(this.delegate.get());
+ }
+
+ @Override
+ public List<Tuple> getNew() {
+ return convert(this.delegate.getNew());
+ }
+
+ @Override
+ public List<Tuple> getExpired() {
+ return convert(this.delegate.getExpired());
+ }
+
+ @Override
+ public Long getEndTimestamp() {
+ return this.delegate.getEndTimestamp();
+ }
+
+ @Override
+ public Long getStartTimestamp() {
+ return this.delegate.getStartTimestamp();
+ }
+
+ private static List<Tuple> convert(List<com.twitter.heron.api.tuple.Tuple> tuples) {
+ List<Tuple> ret = new LinkedList<>();
+ for (com.twitter.heron.api.tuple.Tuple tuple : tuples) {
+ ret.add(new TupleImpl(tuple));
+ }
+ return ret;
+ }
+}
diff --git a/heron/storm/src/java/org/apache/storm/windowing/Window.java b/heron/storm/src/java/org/apache/storm/windowing/Window.java
new file mode 100644
index 0000000..5142ab8
--- /dev/null
+++ b/heron/storm/src/java/org/apache/storm/windowing/Window.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A view of events in a sliding window.
+ *
+ * @param <T> the type of event that this window contains. E.g. {@link org.apache.storm.tuple.Tuple}
+ */
+public interface Window<T> {
+ /**
+ * Gets the list of events in the window.
+ * <p>
+ * <b>Note: </b> If the number of tuples in windows is huge, invoking {@code get} would
+ * load all the tuples into memory and may throw an OOM exception. Use
+ * windowing with persistence
+ * </p>
+ * @return the list of events in the window.
+ */
+ List<T> get();
+
+ /**
+ * Returns an iterator over the events in the window.
+ * @return an {@link Iterator} over the events in the current window.
+ */
+ default Iterator<T> getIter() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
+ * Get the list of newly added events in the window since the last time the window was generated.
+ * @return the list of newly added events in the window.
+ */
+ List<T> getNew();
+
+ /**
+ * Get the list of events expired from the window since the last time the window was generated.
+ * @return the list of events expired from the window.
+ */
+ List<T> getExpired();
+
+ /**
+ * If processing based on event time, returns the window end time based on watermark otherwise
+ * returns the window end time based on processing time.
+ *
+ * @return the window end timestamp
+ */
+ Long getEndTimestamp();
+
+ /**
+ * Returns the window start timestamp. Will return null if the window length is not based on
+ * time duration.
+ *
+ * @return the window start timestamp or null if the window length is not time based
+ */
+ Long getStartTimestamp();
+}
diff --git a/integration_test/src/__init__.py b/integration_test/src/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/__init__.py
diff --git a/integration_test/src/java/BUILD b/integration_test/src/java/BUILD
index 71a0bd7..693d3c2 100644
--- a/integration_test/src/java/BUILD
+++ b/integration_test/src/java/BUILD
@@ -47,6 +47,7 @@
deps = [
"//heron/api/src/java:api-java",
"//heron/storm/src/java:storm-compatibility-java",
+ "@com_googlecode_json_simple_json_simple//jar",
"@commons_cli_commons_cli//jar",
":common",
":core"
@@ -75,6 +76,7 @@
"//heron/api/src/java:api-java",
"//heron/storm/src/java:storm-compatibility-java",
"@commons_cli_commons_cli//jar",
+ "@com_googlecode_json_simple_json_simple//jar",
":common",
":core"
],
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java b/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
index dfaa034..19aec08 100644
--- a/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
+++ b/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
@@ -42,8 +42,12 @@
private Tuple currentTupleProcessing = null;
private OutputCollector collector;
- public IntegrationTestBolt(IRichBolt delegate) {
+ //whether automatically acking should be done
+ private boolean ackAuto;
+
+ public IntegrationTestBolt(IRichBolt delegate, boolean ackAuto) {
this.delegateBolt = delegate;
+ this.ackAuto = ackAuto;
}
@Override
@@ -61,7 +65,7 @@
OutputCollector outputCollector) {
update(context);
this.collector = new OutputCollector(new IntegrationTestBoltCollector(outputCollector));
- this.delegateBolt.prepare(map, context, collector);
+ this.delegateBolt.prepare(map, new TestTopologyContext(context), collector);
}
private int calculateTerminalsToReceive(TopologyContext context) {
@@ -108,7 +112,9 @@
currentTupleProcessing = tuple;
delegateBolt.execute(tuple);
// We ack only the tuples in user's logic
- collector.ack(tuple);
+ if (this.ackAuto) {
+ collector.ack(tuple);
+ }
}
}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestSpout.java b/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestSpout.java
index 0fc4dd6..08aee93 100644
--- a/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestSpout.java
+++ b/integration_test/src/java/com/twitter/heron/integration_test/core/IntegrationTestSpout.java
@@ -96,7 +96,7 @@
// Here the spoutOutputCollector should be a default one
// to emit tuples without adding MessageId
this.spoutOutputCollector = outputCollector;
- delegateSpout.open(map, topologyContext,
+ delegateSpout.open(map, new TestTopologyContext(topologyContext),
new SpoutOutputCollector(new IntegrationTestSpoutCollector(outputCollector)));
}
@@ -153,6 +153,7 @@
public void fail(Object messageId) {
LOG.info("Received a fail with MessageId: " + messageId);
+ tuplesToAck--;
if (!isTestMessageId(messageId)) {
delegateSpout.fail(messageId);
} else {
@@ -166,7 +167,8 @@
}
private static boolean isTestMessageId(Object messageId) {
- return ((String) messageId).startsWith(Constants.INTEGRATION_TEST_MOCK_MESSAGE_ID);
+ return (messageId instanceof String) && ((String) messageId)
+ .startsWith(Constants.INTEGRATION_TEST_MOCK_MESSAGE_ID);
}
protected boolean doneEmitting() {
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyBuilder.java b/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyBuilder.java
index 7290506..e34d3d5 100644
--- a/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyBuilder.java
+++ b/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyBuilder.java
@@ -22,6 +22,8 @@
import com.twitter.heron.api.Config;
import com.twitter.heron.api.HeronTopology;
import com.twitter.heron.api.bolt.IRichBolt;
+import com.twitter.heron.api.bolt.IWindowedBolt;
+import com.twitter.heron.api.bolt.WindowedBoltExecutor;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.spout.IRichSpout;
import com.twitter.heron.api.topology.BoltDeclarer;
@@ -68,7 +70,17 @@
@Override
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) {
- return super.setBolt(id, new IntegrationTestBolt(bolt), parallelismHint);
+ return setBolt(id, bolt, parallelismHint, true);
+ }
+
+ public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint, boolean ackAuto) {
+ return super.setBolt(id, new IntegrationTestBolt(bolt, ackAuto), parallelismHint);
+ }
+
+ public BoltDeclarer setBolt(String id, IWindowedBolt bolt,
+ Number parallelismHint, boolean ackAuto) throws
+ IllegalArgumentException {
+ return setBolt(id, new WindowedBoltExecutor(bolt), parallelismHint, ackAuto);
}
@Override
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyContext.java b/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyContext.java
new file mode 100644
index 0000000..421506b
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/core/TestTopologyContext.java
@@ -0,0 +1,172 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.core;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.api.hooks.ITaskHook;
+import com.twitter.heron.api.metric.CombinedMetric;
+import com.twitter.heron.api.metric.ICombiner;
+import com.twitter.heron.api.metric.IMetric;
+import com.twitter.heron.api.metric.IReducer;
+import com.twitter.heron.api.metric.ReducedMetric;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+
+public class TestTopologyContext implements TopologyContext {
+ private TopologyContext delegate;
+
+ public TestTopologyContext(TopologyContext topologyContext) {
+ this.delegate = topologyContext;
+ }
+
+ @Override
+ public int getThisTaskId() {
+ return this.delegate.getThisTaskId();
+ }
+
+ @Override
+ public String getThisComponentId() {
+ return this.delegate.getThisComponentId();
+ }
+
+ @Override
+ public Fields getThisOutputFields(String streamId) {
+ return this.delegate.getThisOutputFields(streamId);
+ }
+
+ @Override
+ public Set<String> getThisStreams() {
+ return this.delegate.getThisStreams();
+ }
+
+ @Override
+ public int getThisTaskIndex() {
+ return this.delegate.getThisTaskIndex();
+ }
+
+ /**
+ * remove INTEGRATION_TEST_CONTROL_STREAM_ID from topology context
+ */
+ @Override
+ public Map<TopologyAPI.StreamId, TopologyAPI.Grouping> getThisSources() {
+ Map<TopologyAPI.StreamId, TopologyAPI.Grouping> original = getSources(getThisComponentId());
+ Map<TopologyAPI.StreamId, TopologyAPI.Grouping> ret = new HashMap<>();
+ for (Map.Entry<TopologyAPI.StreamId, TopologyAPI.Grouping> entry : original.entrySet()) {
+ if (!entry.getKey().getId().equals(Constants.INTEGRATION_TEST_CONTROL_STREAM_ID)) {
+ ret.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public Map<String, Map<String, TopologyAPI.Grouping>> getThisTargets() {
+ return this.delegate.getThisTargets();
+ }
+
+ @Override
+ public void setTaskData(String name, Object data) {
+ this.delegate.setTaskData(name, data);
+ }
+
+ @Override
+ public Object getTaskData(String name) {
+ return this.delegate.getTaskData(name);
+ }
+
+ @Override
+ public void addTaskHook(ITaskHook hook) {
+ this.delegate.addTaskHook(hook);
+ }
+
+ @Override
+ public Collection<ITaskHook> getHooks() {
+ return this.delegate.getHooks();
+ }
+
+ @Override
+ public <T, U, V> ReducedMetric<T, U, V> registerMetric(String name, IReducer<T, U, V> reducer,
+ int timeBucketSizeInSecs) {
+ return this.delegate.registerMetric(name, reducer, timeBucketSizeInSecs);
+ }
+
+ @Override
+ public <T> CombinedMetric<T> registerMetric(String name, ICombiner<T> combiner, int
+ timeBucketSizeInSecs) {
+ return this.delegate.registerMetric(name, combiner, timeBucketSizeInSecs);
+ }
+
+ @Override
+ public <T extends IMetric<U>, U> T registerMetric(String name, T metric, int
+ timeBucketSizeInSecs) {
+ return this.delegate.registerMetric(name, metric, timeBucketSizeInSecs);
+ }
+
+ @Override
+ public String getTopologyId() {
+ return this.delegate.getTopologyId();
+ }
+
+ @Override
+ public String getComponentId(int taskId) {
+ return this.delegate.getComponentId(taskId);
+ }
+
+ @Override
+ public Set<String> getComponentStreams(String componentId) {
+ return this.delegate.getComponentStreams(componentId);
+ }
+
+ @Override
+ public List<Integer> getComponentTasks(String componentId) {
+ return this.delegate.getComponentTasks(componentId);
+ }
+
+ @Override
+ public Fields getComponentOutputFields(String componentId, String streamId) {
+ return this.delegate.getComponentOutputFields(componentId, streamId);
+ }
+
+ @Override
+ public Map<TopologyAPI.StreamId, TopologyAPI.Grouping> getSources(String componentId) {
+ return this.delegate.getSources(componentId);
+ }
+
+ @Override
+ public Map<String, Map<String, TopologyAPI.Grouping>> getTargets(String componentId) {
+ return this.delegate.getTargets(componentId);
+ }
+
+ @Override
+ public Map<Integer, String> getTaskToComponent() {
+ return this.delegate.getTaskToComponent();
+ }
+
+ @Override
+ public Set<String> getComponentIds() {
+ return this.delegate.getComponentIds();
+ }
+
+ @Override
+ public int maxTopologyMessageTimeout() {
+ return this.delegate.maxTopologyMessageTimeout();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.json
deleted file mode 100644
index 7e04cfc..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.json
deleted file mode 100644
index 7e04cfc..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.json
deleted file mode 100644
index b822614..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.json
deleted file mode 100644
index a3cceb7..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B","A", "B", "A", "B", "A", "B", "A", "B", "A", "B","A", "B", "A", "B", "B", "A", "A", "A", "B", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.json
deleted file mode 100644
index b822614..0000000
--- a/integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.json
+++ /dev/null
@@ -1 +0,0 @@
-["A", "B", "A", "B", "A", "B", "A", "B", "A", "B"]
\ No newline at end of file
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/all_grouping/AllGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/AllGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/all_grouping/AllGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/all_grouping/AllGroupingResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/all_grouping/AllGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/BasicTopologyOneTask.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/basic_topology_one_task/BasicTopologyOneTask.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/BasicTopologyOneTask.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/basic_topology_one_task/BasicTopologyOneTask.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/BasicTopologyOneTask.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/basic_topology_one_task/BasicTopologyOneTaskResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/BasicTopologyOneTask.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/basic_topology_one_task/BasicTopologyOneTaskResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/bolt_double_emit_tuples/BoltDoubleEmitTuples.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/BoltDoubleEmitTuples.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/bolt_double_emit_tuples/BoltDoubleEmitTuples.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/bolt_double_emit_tuples/BoltDoubleEmitTuplesResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/bolt_double_emit_tuples/BoltDoubleEmitTuplesResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/FieldsGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/fields_grouping/FieldsGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/FieldsGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/fields_grouping/FieldsGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/FieldsGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/fields_grouping/FieldsGroupingResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/FieldsGrouping.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/fields_grouping/FieldsGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/GlobalGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/global_grouping/GlobalGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/GlobalGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/global_grouping/GlobalGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/GlobalGrouping.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/global_grouping/GlobalGroupingResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/GlobalGrouping.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/global_grouping/GlobalGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/MultiSpoutsMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/multi_spouts_multi_tasks/MultiSpoutsMultiTasks.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/MultiSpoutsMultiTasks.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/multi_spouts_multi_tasks/MultiSpoutsMultiTasks.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/MultiSpoutsMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/multi_spouts_multi_tasks/MultiSpoutsMultiTasksResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/MultiSpoutsMultiTasks.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/multi_spouts_multi_tasks/MultiSpoutsMultiTasksResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/non_grouping/NonGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/NonGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/non_grouping/NonGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/non_grouping/NonGroupingResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/non_grouping/NonGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasks.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasks.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasksResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/one_bolt_multi_tasks/OneBoltMultiTasksResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasks.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasks.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasksResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasksResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_multi_tasks/OneSpoutMultiTasks.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutMultiTasks.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_multi_tasks/OneSpoutMultiTasks.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_multi_tasks/OneSpoutMultiTasksResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutBoltMultiTasks.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_multi_tasks/OneSpoutMultiTasksResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_two_bolts/OneSpoutTwoBolts.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_two_bolts/OneSpoutTwoBolts.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_two_bolts/OneSpoutTwoBoltsResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneSpoutTwoBolts.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/one_spout_two_bolts/OneSpoutTwoBoltsResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/CustomCheckBolt.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomCheckBolt.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/CustomCheckBolt.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomCheckBolt.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/CustomObject.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomObject.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/CustomObject.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomObject.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/CustomSpout.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomSpout.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/CustomSpout.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/CustomSpout.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/SerializationTopology.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/SerializationTopology.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/SerializationTopology.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/SerializationTopology.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/SerializationTopology.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/SerializationTopologyResults.json
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/SerializationTopology.json
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/serialization/SerializationTopologyResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/shuffle_grouping/ShuffleGrouping.java
similarity index 100%
rename from integration_test/src/java/com/twitter/heron/integration_test/topology/ShuffleGrouping.java
rename to integration_test/src/java/com/twitter/heron/integration_test/topology/shuffle_grouping/ShuffleGrouping.java
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/shuffle_grouping/ShuffleGroupingResults.json
similarity index 100%
copy from integration_test/src/java/com/twitter/heron/integration_test/topology/OneBoltMultiTasks.json
copy to integration_test/src/java/com/twitter/heron/integration_test/topology/shuffle_grouping/ShuffleGroupingResults.json
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/WindowTestBase.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/WindowTestBase.java
new file mode 100644
index 0000000..8647bb1
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/WindowTestBase.java
@@ -0,0 +1,260 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing;
+
+import java.net.MalformedURLException;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.logging.Logger;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.api.tuple.Values;
+import com.twitter.heron.api.utils.Utils;
+import com.twitter.heron.api.windowing.TupleWindow;
+import com.twitter.heron.integration_test.common.AbstractTestTopology;
+import com.twitter.heron.integration_test.core.TestTopologyBuilder;
+
+public class WindowTestBase extends AbstractTestTopology {
+
+ private static final String NUMBER_FIELD = "number";
+ private static final String STRING_FIELD = "numAsStr";
+ private static final String EVENT_TIME_FIELD = "eventTimeField";
+ private static final String DUMMY_FIELD = "dummy";
+
+ private BaseWindowedBolt.Count windowCountLength;
+ private BaseWindowedBolt.Count slideCountInterval;
+ private Duration windowDurationLength;
+ private Duration slideDurationInterval;
+ private Long sleepBetweenTuples;
+ private boolean useEventTime = false;
+ private Duration watermarkInterval;
+
+
+ public String getBoltName() {
+ return "VerificationBolt";
+ }
+
+ public String getSpoutName() {
+ return "IncrementingSpout";
+ }
+
+
+ public WindowTestBase(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public WindowTestBase withWindowLength(Duration duration) {
+ this.windowDurationLength = duration;
+ return this;
+ }
+
+ public WindowTestBase withWindowLength(BaseWindowedBolt.Count count) {
+ this.windowCountLength = count;
+ return this;
+ }
+
+ public WindowTestBase withSlidingInterval(Duration duration) {
+ this.slideDurationInterval = duration;
+ return this;
+ }
+
+ public WindowTestBase withSlidingInterval(BaseWindowedBolt.Count count) {
+ this.slideCountInterval = count;
+ return this;
+ }
+
+ public WindowTestBase withSleepBetweenTuples(long millis) {
+ this.sleepBetweenTuples = millis;
+ return this;
+ }
+
+ public WindowTestBase useEventTime() {
+ this.useEventTime = true;
+ return this;
+ }
+
+ @SuppressWarnings("HiddenField")
+ public WindowTestBase withWatermarkInterval(Duration watermarkInterval) {
+ this.watermarkInterval = watermarkInterval;
+ return this;
+ }
+
+ @Override
+ protected TestTopologyBuilder buildTopology(TestTopologyBuilder builder) {
+ builder.setSpout(getSpoutName(), new IncrementingSpout()
+ .withSleepBetweenTuples(sleepBetweenTuples), 1);
+
+ BaseWindowedBolt bolt = null;
+ if (this.windowCountLength != null) {
+ if (this.slideCountInterval != null) {
+ bolt = new VerificationBolt()
+ .withWindow(this.windowCountLength, this.slideCountInterval);
+ } else {
+ bolt = new VerificationBolt()
+ .withTumblingWindow(this.windowCountLength);
+ }
+ }
+
+ if (this.windowDurationLength != null) {
+ if (this.slideDurationInterval != null) {
+ bolt = new VerificationBolt()
+ .withWindow(this.windowDurationLength, this.slideDurationInterval);
+ } else {
+ bolt = new VerificationBolt()
+ .withTumblingWindow(this.windowDurationLength);
+ }
+ }
+
+ if (this.useEventTime) {
+ bolt.withTimestampField(EVENT_TIME_FIELD);
+ }
+
+ if (this.watermarkInterval != null) {
+ bolt.withWatermarkInterval(this.watermarkInterval);
+ }
+ builder.setBolt(getBoltName(), bolt, 1, false)
+ .shuffleGrouping(getSpoutName());
+ return builder;
+ }
+
+ public static class IncrementingSpout extends BaseRichSpout {
+ private static final Logger LOG = Logger.getLogger(IncrementingSpout.class.getName());
+ private static final long serialVersionUID = -6171170228097868632L;
+ private SpoutOutputCollector collector;
+ private static int currentNum;
+ private static Random rng = new Random();
+ private String componentId;
+ private Long sleepBetweenTuples = null;
+ // In millis
+ private long currentTime = 1504573536000L;
+
+ public IncrementingSpout withSleepBetweenTuples(long millis) {
+ this.sleepBetweenTuples = millis;
+ return this;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD, EVENT_TIME_FIELD));
+ }
+
+ @Override
+ @SuppressWarnings("HiddenField")
+ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector
+ collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ if (sleepBetweenTuples == null) {
+ Utils.sleep(rng.nextInt(10));
+ } else {
+ Utils.sleep(sleepBetweenTuples);
+ }
+ currentNum++;
+ final String numAsStr = "str(" + currentNum + ")str";
+ final Values tuple = new Values(currentNum, numAsStr, currentTime += 1000);
+ LOG.info("Time = " + System.currentTimeMillis()
+ + " Component = " + componentId + " Tuple = " + tuple.toString());
+
+ collector.emit(tuple, currentNum);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ LOG.info("Received ACK for msgId : " + msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.info("Received FAIL for msgId : " + msgId);
+ }
+ }
+
+ public static class VerificationBolt extends BaseWindowedBolt {
+ private static final long serialVersionUID = -6067634845003700125L;
+ private OutputCollector collector;
+ private String componentId;
+
+ private static int windowCount = 0;
+
+
+ private static final Logger LOG = Logger.getLogger(VerificationBolt.class.getName());
+
+
+ @Override
+ @SuppressWarnings("HiddenField")
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector
+ collector) {
+ componentId = context.getThisComponentId();
+ this.collector = collector;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void execute(TupleWindow inputWindow) {
+ windowCount++;
+ List<Tuple> tuplesInWindow = inputWindow.get();
+ List<Tuple> newTuples = inputWindow.getNew();
+ List<Tuple> expiredTuples = inputWindow.getExpired();
+ LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size());
+ LOG.info("newTuples.size() = " + newTuples.size());
+ LOG.info("expiredTuples.size() = " + expiredTuples.size());
+
+ JSONObject jsonObject = new JSONObject();
+ JSONArray jsonArray = new JSONArray();
+ JSONObject tmp = new JSONObject();
+ tmp.put("tuplesInWindow", tuplesToListString(tuplesInWindow));
+ jsonArray.add(tmp);
+ tmp = new JSONObject();
+ tmp.put("newTuples", tuplesToListString(newTuples));
+ jsonArray.add(tmp);
+ tmp = new JSONObject();
+ tmp.put("expiredTuples", tuplesToListString(expiredTuples));
+ jsonArray.add(tmp);
+ jsonObject.put(windowCount, jsonArray);
+ LOG.info("Component = " + componentId + " Window Count = " + windowCount
+ + " tuplesInWindow = " + jsonArray.toJSONString());
+
+ collector.emit(new Values(jsonObject.toJSONString()));
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(DUMMY_FIELD));
+ }
+ }
+
+ public static List<String> tuplesToListString(List<Tuple> tuples) {
+ List<String> tmp = new LinkedList<>();
+ for (Tuple tuple : tuples) {
+ tmp.add(tuple.getValue(0).toString());
+ }
+ return tmp;
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1.java
new file mode 100644
index 0000000..b77ec65
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1.java
@@ -0,0 +1,34 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingCountWindowTest1 extends WindowTestBase {
+
+ public SlidingCountWindowTest1(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ WindowTestBase topology = new SlidingCountWindowTest1(args)
+ .withSleepBetweenTuples(1000)
+ .withWindowLength(BaseWindowedBolt.Count.of(10))
+ .withSlidingInterval(BaseWindowedBolt.Count.of(5));
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1Results.json
new file mode 100644
index 0000000..e8680ad
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest1Results.json
@@ -0,0 +1,4 @@
+[
+ "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\"]},{\"newTuples\":[\"1\",\"2\",\"3\",\"4\",\"5\"]},{\"expiredTuples\":[]}]}",
+ "{\"2\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\",\"10\"]},{\"newTuples\":[\"6\",\"7\",\"8\",\"9\",\"10\"]},{\"expiredTuples\":[]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2.java
new file mode 100644
index 0000000..b8bd7e6
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2.java
@@ -0,0 +1,34 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingCountWindowTest2 extends WindowTestBase {
+
+ public SlidingCountWindowTest2(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ WindowTestBase topology = new SlidingCountWindowTest2(args)
+ .withSleepBetweenTuples(1000)
+ .withWindowLength(BaseWindowedBolt.Count.of(2))
+ .withSlidingInterval(BaseWindowedBolt.Count.of(1));
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2Results.json
new file mode 100644
index 0000000..970469f
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest2Results.json
@@ -0,0 +1,12 @@
+[
+ "{\"1\":[{\"tuplesInWindow\":[\"1\"]},{\"newTuples\":[\"1\"]},{\"expiredTuples\":[]}]}",
+ "{\"2\":[{\"tuplesInWindow\":[\"1\",\"2\"]},{\"newTuples\":[\"2\"]},{\"expiredTuples\":[]}]}",
+ "{\"3\":[{\"tuplesInWindow\":[\"2\",\"3\"]},{\"newTuples\":[\"3\"]},{\"expiredTuples\":[\"1\"]}]}",
+ "{\"4\":[{\"tuplesInWindow\":[\"3\",\"4\"]},{\"newTuples\":[\"4\"]},{\"expiredTuples\":[\"2\"]}]}",
+ "{\"5\":[{\"tuplesInWindow\":[\"4\",\"5\"]},{\"newTuples\":[\"5\"]},{\"expiredTuples\":[\"3\"]}]}",
+ "{\"6\":[{\"tuplesInWindow\":[\"5\",\"6\"]},{\"newTuples\":[\"6\"]},{\"expiredTuples\":[\"4\"]}]}",
+ "{\"7\":[{\"tuplesInWindow\":[\"6\",\"7\"]},{\"newTuples\":[\"7\"]},{\"expiredTuples\":[\"5\"]}]}",
+ "{\"8\":[{\"tuplesInWindow\":[\"7\",\"8\"]},{\"newTuples\":[\"8\"]},{\"expiredTuples\":[\"6\"]}]}",
+ "{\"9\":[{\"tuplesInWindow\":[\"8\",\"9\"]},{\"newTuples\":[\"9\"]},{\"expiredTuples\":[\"7\"]}]}",
+ "{\"10\":[{\"tuplesInWindow\":[\"9\",\"10\"]},{\"newTuples\":[\"10\"]},{\"expiredTuples\":[\"8\"]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3.java
new file mode 100644
index 0000000..1080c88
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3.java
@@ -0,0 +1,34 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingCountWindowTest3 extends WindowTestBase {
+
+ public SlidingCountWindowTest3(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ WindowTestBase topology = new SlidingCountWindowTest3(args)
+ .withSleepBetweenTuples(1000)
+ .withWindowLength(BaseWindowedBolt.Count.of(7))
+ .withSlidingInterval(BaseWindowedBolt.Count.of(3));
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3Results.json
new file mode 100644
index 0000000..975f01c
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/SlidingCountWindowTest3Results.json
@@ -0,0 +1,5 @@
+[
+ "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\"]},{\"newTuples\":[\"1\",\"2\",\"3\"]},{\"expiredTuples\":[]}]}",
+ "{\"2\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\"]},{\"newTuples\":[\"4\",\"5\",\"6\"]},{\"expiredTuples\":[]}]}",
+ "{\"3\":[{\"tuplesInWindow\":[\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\"]},{\"newTuples\":[\"7\",\"8\",\"9\"]},{\"expiredTuples\":[\"1\",\"2\"]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1.java
new file mode 100644
index 0000000..3f9e612
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1.java
@@ -0,0 +1,33 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class TumblingCountWindowTest1 extends WindowTestBase {
+
+ public TumblingCountWindowTest1(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ WindowTestBase topology = new TumblingCountWindowTest1(args)
+ .withSleepBetweenTuples(1000)
+ .withWindowLength(BaseWindowedBolt.Count.of(10));
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1Results.json
new file mode 100644
index 0000000..2416022
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest1Results.json
@@ -0,0 +1,3 @@
+[
+ "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\",\"10\"]},{\"newTuples\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\",\"10\"]},{\"expiredTuples\":[]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2.java
new file mode 100644
index 0000000..15c80d6
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2.java
@@ -0,0 +1,33 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class TumblingCountWindowTest2 extends WindowTestBase {
+
+ public TumblingCountWindowTest2(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ WindowTestBase topology = new TumblingCountWindowTest2(args)
+ .withSleepBetweenTuples(1000)
+ .withWindowLength(BaseWindowedBolt.Count.of(2));
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2Results.json
new file mode 100644
index 0000000..e258923
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest2Results.json
@@ -0,0 +1,7 @@
+[
+ "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\"]},{\"newTuples\":[\"1\",\"2\"]},{\"expiredTuples\":[]}]}",
+ "{\"2\":[{\"tuplesInWindow\":[\"3\",\"4\"]},{\"newTuples\":[\"3\",\"4\"]},{\"expiredTuples\":[\"1\",\"2\"]}]}",
+ "{\"3\":[{\"tuplesInWindow\":[\"5\",\"6\"]},{\"newTuples\":[\"5\",\"6\"]},{\"expiredTuples\":[\"3\",\"4\"]}]}",
+ "{\"4\":[{\"tuplesInWindow\":[\"7\",\"8\"]},{\"newTuples\":[\"7\",\"8\"]},{\"expiredTuples\":[\"5\",\"6\"]}]}",
+ "{\"5\":[{\"tuplesInWindow\":[\"9\",\"10\"]},{\"newTuples\":[\"9\",\"10\"]},{\"expiredTuples\":[\"7\",\"8\"]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3.java
new file mode 100644
index 0000000..227b1cc
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3.java
@@ -0,0 +1,33 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.count;
+
+import java.net.MalformedURLException;
+
+import com.twitter.heron.api.bolt.BaseWindowedBolt;
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class TumblingCountWindowTest3 extends WindowTestBase {
+
+ public TumblingCountWindowTest3(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ WindowTestBase topology = new TumblingCountWindowTest3(args)
+ .withSleepBetweenTuples(1000)
+ .withWindowLength(BaseWindowedBolt.Count.of(7));
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3Results.json
new file mode 100644
index 0000000..8969a9a
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/count/TumblingCountWindowTest3Results.json
@@ -0,0 +1,3 @@
+[
+ "{\"1\":[{\"tuplesInWindow\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\"]},{\"newTuples\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\"]},{\"expiredTuples\":[]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1.java
new file mode 100644
index 0000000..120fdea
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1.java
@@ -0,0 +1,32 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.time;
+
+import java.net.MalformedURLException;
+import java.time.Duration;
+
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingTimeWindowTest1 extends WindowTestBase {
+ public SlidingTimeWindowTest1(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ WindowTestBase topology = new SlidingTimeWindowTest1(args).withSleepBetweenTuples(1000)
+ .withWindowLength(Duration.ofMillis(1000))
+ .withSlidingInterval(Duration.ofMillis(1000));
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1Results.json
new file mode 100644
index 0000000..7af261c
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/time/SlidingTimeWindowTest1Results.json
@@ -0,0 +1,12 @@
+[
+ "{\"1\":[{\"tuplesInWindow\":[\"1\"]},{\"newTuples\":[\"1\"]},{\"expiredTuples\":[]}]}",
+ "{\"10\":[{\"tuplesInWindow\":[\"10\"]},{\"newTuples\":[\"10\"]},{\"expiredTuples\":[\"9\"]}]}",
+ "{\"2\":[{\"tuplesInWindow\":[\"2\"]},{\"newTuples\":[\"2\"]},{\"expiredTuples\":[\"1\"]}]}",
+ "{\"3\":[{\"tuplesInWindow\":[\"3\"]},{\"newTuples\":[\"3\"]},{\"expiredTuples\":[\"2\"]}]}",
+ "{\"4\":[{\"tuplesInWindow\":[\"4\"]},{\"newTuples\":[\"4\"]},{\"expiredTuples\":[\"3\"]}]}",
+ "{\"5\":[{\"tuplesInWindow\":[\"5\"]},{\"newTuples\":[\"5\"]},{\"expiredTuples\":[\"4\"]}]}",
+ "{\"6\":[{\"tuplesInWindow\":[\"6\"]},{\"newTuples\":[\"6\"]},{\"expiredTuples\":[\"5\"]}]}",
+ "{\"7\":[{\"tuplesInWindow\":[\"7\"]},{\"newTuples\":[\"7\"]},{\"expiredTuples\":[\"6\"]}]}",
+ "{\"8\":[{\"tuplesInWindow\":[\"8\"]},{\"newTuples\":[\"8\"]},{\"expiredTuples\":[\"7\"]}]}",
+ "{\"9\":[{\"tuplesInWindow\":[\"9\"]},{\"newTuples\":[\"9\"]},{\"expiredTuples\":[\"8\"]}]}"
+]
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1.java b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1.java
new file mode 100644
index 0000000..2077c3b
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1.java
@@ -0,0 +1,36 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.integration_test.topology.windowing.watermark;
+
+import java.net.MalformedURLException;
+import java.time.Duration;
+
+import com.twitter.heron.integration_test.topology.windowing.WindowTestBase;
+
+public class SlidingWatermarkEventTimeWindowTest1 extends WindowTestBase {
+
+ public SlidingWatermarkEventTimeWindowTest1(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ WindowTestBase topology = new SlidingWatermarkEventTimeWindowTest1(args)
+ .withSleepBetweenTuples(1000)
+ .useEventTime()
+ .withWatermarkInterval(Duration.ofMillis(1000))
+ .withWindowLength(Duration.ofMillis(1000))
+ .withSlidingInterval(Duration.ofMillis(1000));
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1Results.json b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1Results.json
new file mode 100644
index 0000000..7af261c
--- /dev/null
+++ b/integration_test/src/java/com/twitter/heron/integration_test/topology/windowing/watermark/SlidingWatermarkEventTimeWindowTest1Results.json
@@ -0,0 +1,12 @@
+[
+ "{\"1\":[{\"tuplesInWindow\":[\"1\"]},{\"newTuples\":[\"1\"]},{\"expiredTuples\":[]}]}",
+ "{\"10\":[{\"tuplesInWindow\":[\"10\"]},{\"newTuples\":[\"10\"]},{\"expiredTuples\":[\"9\"]}]}",
+ "{\"2\":[{\"tuplesInWindow\":[\"2\"]},{\"newTuples\":[\"2\"]},{\"expiredTuples\":[\"1\"]}]}",
+ "{\"3\":[{\"tuplesInWindow\":[\"3\"]},{\"newTuples\":[\"3\"]},{\"expiredTuples\":[\"2\"]}]}",
+ "{\"4\":[{\"tuplesInWindow\":[\"4\"]},{\"newTuples\":[\"4\"]},{\"expiredTuples\":[\"3\"]}]}",
+ "{\"5\":[{\"tuplesInWindow\":[\"5\"]},{\"newTuples\":[\"5\"]},{\"expiredTuples\":[\"4\"]}]}",
+ "{\"6\":[{\"tuplesInWindow\":[\"6\"]},{\"newTuples\":[\"6\"]},{\"expiredTuples\":[\"5\"]}]}",
+ "{\"7\":[{\"tuplesInWindow\":[\"7\"]},{\"newTuples\":[\"7\"]},{\"expiredTuples\":[\"6\"]}]}",
+ "{\"8\":[{\"tuplesInWindow\":[\"8\"]},{\"newTuples\":[\"8\"]},{\"expiredTuples\":[\"7\"]}]}",
+ "{\"9\":[{\"tuplesInWindow\":[\"9\"]},{\"newTuples\":[\"9\"]},{\"expiredTuples\":[\"8\"]}]}"
+]
diff --git a/integration_test/src/python/__init__.py b/integration_test/src/python/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/__init__.py b/integration_test/src/python/integration_test/topology/__init__.py
index 53d0931..8b13789 100644
--- a/integration_test/src/python/integration_test/topology/__init__.py
+++ b/integration_test/src/python/integration_test/topology/__init__.py
@@ -1,19 +1 @@
-"""Integration test topologies"""
-__all__ = [
- 'all_grouping',
- 'basic_one_task'
- 'bolt_double_emit_tuples',
- 'fields_grouping',
- 'global_grouping',
- 'multi_spouts_multi_tasks',
- 'none_grouping',
- 'one_bolt_multi_tasks',
- 'one_spout_bolt_multi_tasks',
- 'one_spout_multi_tasks',
- 'one_spout_two_bolts',
- 'shuffle_grouping',
-]
-# import some core stuff
-import integration_test.src.python.integration_test.common as common
-import integration_test.src.python.integration_test.core as core
diff --git a/integration_test/src/python/integration_test/topology/all_grouping/__init__.py b/integration_test/src/python/integration_test/topology/all_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/all_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/all_grouping.py b/integration_test/src/python/integration_test/topology/all_grouping/all_grouping.py
similarity index 95%
rename from integration_test/src/python/integration_test/topology/all_grouping.py
rename to integration_test/src/python/integration_test/topology/all_grouping/all_grouping.py
index fa96200..02f569f 100644
--- a/integration_test/src/python/integration_test/topology/all_grouping.py
+++ b/integration_test/src/python/integration_test/topology/all_grouping/all_grouping.py
@@ -19,7 +19,8 @@
from integration_test.src.python.integration_test.common.bolt import IdentityBolt
from integration_test.src.python.integration_test.common.spout import ABSpout
-def all_grouping_buidler(topology_name, http_server_url):
+
+def all_grouping_builder(topology_name, http_server_url):
"""Integration test topology builder for all grouping"""
builder = TestTopologyBuilder(topology_name, http_server_url)
ab_spout = builder.add_spout("ab-spout", ABSpout, 1)
diff --git a/integration_test/src/python/integration_test/topology/all_grouping_result.json b/integration_test/src/python/integration_test/topology/all_grouping/all_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/all_grouping_result.json
rename to integration_test/src/python/integration_test/topology/all_grouping/all_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/basic_one_task/__init__.py b/integration_test/src/python/integration_test/topology/basic_one_task/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/basic_one_task/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/basic_one_task.py b/integration_test/src/python/integration_test/topology/basic_one_task/basic_one_task.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/basic_one_task.py
rename to integration_test/src/python/integration_test/topology/basic_one_task/basic_one_task.py
diff --git a/integration_test/src/python/integration_test/topology/basic_one_task_result.json b/integration_test/src/python/integration_test/topology/basic_one_task/basic_one_task_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/basic_one_task_result.json
rename to integration_test/src/python/integration_test/topology/basic_one_task/basic_one_task_result.json
diff --git a/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/__init__.py b/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples.py b/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/bolt_double_emit_tuples.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/bolt_double_emit_tuples.py
rename to integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/bolt_double_emit_tuples.py
diff --git a/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples_result.json b/integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/bolt_double_emit_tuples_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/bolt_double_emit_tuples_result.json
rename to integration_test/src/python/integration_test/topology/bolt_double_emit_tuples/bolt_double_emit_tuples_result.json
diff --git a/integration_test/src/python/integration_test/topology/fields_grouping/__init__.py b/integration_test/src/python/integration_test/topology/fields_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/fields_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/fields_grouping.py b/integration_test/src/python/integration_test/topology/fields_grouping/fields_grouping.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/fields_grouping.py
rename to integration_test/src/python/integration_test/topology/fields_grouping/fields_grouping.py
diff --git a/integration_test/src/python/integration_test/topology/fields_grouping_result.json b/integration_test/src/python/integration_test/topology/fields_grouping/fields_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/fields_grouping_result.json
rename to integration_test/src/python/integration_test/topology/fields_grouping/fields_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/global_grouping/__init__.py b/integration_test/src/python/integration_test/topology/global_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/global_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/global_grouping.py b/integration_test/src/python/integration_test/topology/global_grouping/global_grouping.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/global_grouping.py
rename to integration_test/src/python/integration_test/topology/global_grouping/global_grouping.py
diff --git a/integration_test/src/python/integration_test/topology/global_grouping_result.json b/integration_test/src/python/integration_test/topology/global_grouping/global_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/global_grouping_result.json
rename to integration_test/src/python/integration_test/topology/global_grouping/global_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/__init__.py b/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks.py b/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/multi_spouts_multi_tasks.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks.py
rename to integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/multi_spouts_multi_tasks.py
diff --git a/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks_result.json b/integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/multi_spouts_multi_tasks_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks_result.json
rename to integration_test/src/python/integration_test/topology/multi_spouts_multi_tasks/multi_spouts_multi_tasks_result.json
diff --git a/integration_test/src/python/integration_test/topology/none_grouping/__init__.py b/integration_test/src/python/integration_test/topology/none_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/none_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/none_grouping.py b/integration_test/src/python/integration_test/topology/none_grouping/none_grouping.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/none_grouping.py
rename to integration_test/src/python/integration_test/topology/none_grouping/none_grouping.py
diff --git a/integration_test/src/python/integration_test/topology/none_grouping_result.json b/integration_test/src/python/integration_test/topology/none_grouping/none_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/none_grouping_result.json
rename to integration_test/src/python/integration_test/topology/none_grouping/none_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/__init__.py b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks.py b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_bolt_multi_tasks.py
rename to integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks.py
diff --git a/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks_result.json b/integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_bolt_multi_tasks_result.json
rename to integration_test/src/python/integration_test/topology/one_bolt_multi_tasks/one_bolt_multi_tasks_result.json
diff --git a/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/__init__.py b/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks.py b/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks.py
rename to integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks_result.json b/integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks_result.json
rename to integration_test/src/python/integration_test/topology/one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks_result.json
diff --git a/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/__init__.py b/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_multi_tasks.py b/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/one_spout_multi_tasks.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_multi_tasks.py
rename to integration_test/src/python/integration_test/topology/one_spout_multi_tasks/one_spout_multi_tasks.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_multi_tasks_result.json b/integration_test/src/python/integration_test/topology/one_spout_multi_tasks/one_spout_multi_tasks_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_multi_tasks_result.json
rename to integration_test/src/python/integration_test/topology/one_spout_multi_tasks/one_spout_multi_tasks_result.json
diff --git a/integration_test/src/python/integration_test/topology/one_spout_two_bolts/__init__.py b/integration_test/src/python/integration_test/topology/one_spout_two_bolts/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/one_spout_two_bolts/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_two_bolts.py b/integration_test/src/python/integration_test/topology/one_spout_two_bolts/one_spout_two_bolts.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_two_bolts.py
rename to integration_test/src/python/integration_test/topology/one_spout_two_bolts/one_spout_two_bolts.py
diff --git a/integration_test/src/python/integration_test/topology/one_spout_two_bolts_result.json b/integration_test/src/python/integration_test/topology/one_spout_two_bolts/one_spout_two_bolts_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/one_spout_two_bolts_result.json
rename to integration_test/src/python/integration_test/topology/one_spout_two_bolts/one_spout_two_bolts_result.json
diff --git a/integration_test/src/python/integration_test/topology/shuffle_grouping/__init__.py b/integration_test/src/python/integration_test/topology/shuffle_grouping/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/integration_test/src/python/integration_test/topology/shuffle_grouping/__init__.py
diff --git a/integration_test/src/python/integration_test/topology/shuffle_grouping.py b/integration_test/src/python/integration_test/topology/shuffle_grouping/shuffle_grouping.py
similarity index 100%
rename from integration_test/src/python/integration_test/topology/shuffle_grouping.py
rename to integration_test/src/python/integration_test/topology/shuffle_grouping/shuffle_grouping.py
diff --git a/integration_test/src/python/integration_test/topology/shuffle_grouping_result.json b/integration_test/src/python/integration_test/topology/shuffle_grouping/shuffle_grouping_result.json
similarity index 100%
rename from integration_test/src/python/integration_test/topology/shuffle_grouping_result.json
rename to integration_test/src/python/integration_test/topology/shuffle_grouping/shuffle_grouping_result.json
diff --git a/integration_test/src/python/integration_test/topology/test_topology_main.py b/integration_test/src/python/integration_test/topology/test_topology_main.py
index 25a7964..08f587c 100644
--- a/integration_test/src/python/integration_test/topology/test_topology_main.py
+++ b/integration_test/src/python/integration_test/topology/test_topology_main.py
@@ -17,45 +17,45 @@
import logging
import sys
-from integration_test.src.python.integration_test.topology.all_grouping \
- import all_grouping_buidler
+from integration_test.src.python.integration_test.topology.all_grouping.all_grouping \
+ import all_grouping_builder
-from integration_test.src.python.integration_test.topology.basic_one_task \
+from integration_test.src.python.integration_test.topology.basic_one_task.basic_one_task \
import basic_one_task_builder
-from integration_test.src.python.integration_test.topology.bolt_double_emit_tuples \
- import bolt_double_emit_tuples_builder
+from integration_test.src.python.integration_test.topology.bolt_double_emit_tuples\
+ .bolt_double_emit_tuples import bolt_double_emit_tuples_builder
-from integration_test.src.python.integration_test.topology.fields_grouping \
+from integration_test.src.python.integration_test.topology.fields_grouping.fields_grouping \
import fields_grouping_builder
-from integration_test.src.python.integration_test.topology.global_grouping \
+from integration_test.src.python.integration_test.topology.global_grouping.global_grouping \
import global_grouping_builder
-from integration_test.src.python.integration_test.topology.multi_spouts_multi_tasks \
- import multi_spouts_multi_tasks_builder
+from integration_test.src.python.integration_test.topology.multi_spouts_multi_tasks\
+ .multi_spouts_multi_tasks import multi_spouts_multi_tasks_builder
-from integration_test.src.python.integration_test.topology.none_grouping \
+from integration_test.src.python.integration_test.topology.none_grouping.none_grouping \
import none_grouping_builder
-from integration_test.src.python.integration_test.topology.one_bolt_multi_tasks \
- import one_bolt_multi_tasks_builder
+from integration_test.src.python.integration_test.topology. one_bolt_multi_tasks\
+ .one_bolt_multi_tasks import one_bolt_multi_tasks_builder
-from integration_test.src.python.integration_test.topology.one_spout_bolt_multi_tasks \
- import one_spout_bolt_multi_tasks_builder
+from integration_test.src.python.integration_test.topology.one_spout_bolt_multi_tasks\
+ .one_spout_bolt_multi_tasks import one_spout_bolt_multi_tasks_builder
-from integration_test.src.python.integration_test.topology.one_spout_multi_tasks \
- import one_spout_multi_tasks_builder
+from integration_test.src.python.integration_test.topology.one_spout_multi_tasks\
+ .one_spout_multi_tasks import one_spout_multi_tasks_builder
-from integration_test.src.python.integration_test.topology.one_spout_two_bolts \
- import one_spout_two_bolts_builder
+from integration_test.src.python.integration_test.topology.one_spout_two_bolts\
+ .one_spout_two_bolts import one_spout_two_bolts_builder
-from integration_test.src.python.integration_test.topology.shuffle_grouping \
+from integration_test.src.python.integration_test.topology.shuffle_grouping.shuffle_grouping \
import shuffle_grouping_builder
TOPOLOGY_BUILDERS = {
'Heron_IntegrationTest_BasicOneTask': basic_one_task_builder,
- 'Heron_IntegrationTest_AllGrouping': all_grouping_buidler,
+ 'Heron_IntegrationTest_AllGrouping': all_grouping_builder,
'Heron_IntegrationTest_NoneGrouping': none_grouping_builder,
'Heron_IntegrationTest_OneBoltMultiTasks': one_bolt_multi_tasks_builder,
'Heron_IntegrationTest_OneSpoutBoltMultiTasks': one_spout_bolt_multi_tasks_builder,
diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json
index bed1d6b..8c1e15e 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -11,32 +11,32 @@
{
"topologyName" : "IntegrationTest_FieldsGrouping",
"classPath" : "fields_grouping.FieldsGrouping",
- "expectedResultRelativePath" : "FieldsGrouping.json"
+ "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json"
},
{
"topologyName" : "IntegrationTest_AllGrouping",
"classPath" : "all_grouping.AllGrouping",
- "expectedResultRelativePath" : "AllGrouping.json"
+ "expectedResultRelativePath" : "all_grouping/AllGroupingResults.json"
},
{
"topologyName" : "IntegrationTest_GlobalGrouping",
"classPath" : "global_grouping.GlobalGrouping",
- "expectedResultRelativePath" : "GlobalGrouping.json"
+ "expectedResultRelativePath" : "global_grouping/GlobalGroupingResults.json"
},
{
"topologyName" : "IntegrationTest_NonGrouping",
"classPath" : "non_grouping.NonGrouping",
- "expectedResultRelativePath" : "NonGrouping.json"
+ "expectedResultRelativePath" : "non_grouping/NonGroupingResults.json"
},
{
"topologyName" : "IntegrationTest_ShuffleGrouping",
"classPath" : "shuffle_grouping.ShuffleGrouping",
- "expectedResultRelativePath" : "ShuffleGrouping.json"
+ "expectedResultRelativePath" : "shuffle_grouping/ShuffleGroupingResults.json"
},
{
"topologyName" : "IntegrationTest_BasicTopologyOneTask",
"classPath" : "basic_topology_one_task.BasicTopologyOneTask",
- "expectedResultRelativePath" : "BasicTopologyOneTask.json"
+ "expectedResultRelativePath" : "basic_topology_one_task/BasicTopologyOneTaskResults.json"
},
{
"topologyName" : "IntegrationTest_BasicTopologyOneTaskScaleUp",
@@ -55,104 +55,144 @@
{
"topologyName" : "IntegrationTest_BoltDoubleEmitTuples",
"classPath" : "bolt_double_emit_tuples.BoltDoubleEmitTuples",
- "expectedResultRelativePath" : "BoltDoubleEmitTuples.json"
+ "expectedResultRelativePath" : "bolt_double_emit_tuples/BoltDoubleEmitTuplesResults.json"
},
{
"topologyName" : "IntegrationTest_MultiSpoutsMultiTasks",
"classPath" : "multi_spouts_multi_tasks.MultiSpoutsMultiTasks",
- "expectedResultRelativePath" : "MultiSpoutsMultiTasks.json"
+ "expectedResultRelativePath" : "multi_spouts_multi_tasks/MultiSpoutsMultiTasksResults.json"
},
{
"topologyName" : "IntegrationTest_OneBoltMultiTasks",
"classPath" : "one_bolt_multi_tasks.OneBoltMultiTasks",
- "expectedResultRelativePath" : "OneBoltMultiTasks.json"
+ "expectedResultRelativePath" : "one_bolt_multi_tasks/OneBoltMultiTasksResults.json"
},
{
- "topologyName" : "IntegrationTest_OneSpoutBoltMultiTasks",
- "classPath" : "one_spout_bolt_multi_tasks.OneSpoutBoltMultiTasks",
- "expectedResultRelativePath" : "OneSpoutBoltMultiTasks.json"
- },
+ "topologyName" : "IntegrationTest_OneSpoutBoltMultiTasks",
+ "classPath" : "one_spout_bolt_multi_tasks.OneSpoutBoltMultiTasks",
+ "expectedResultRelativePath" : "one_spout_bolt_multi_tasks/OneSpoutBoltMultiTasksResults.json"
+ },
{
"topologyName" : "IntegrationTest_OneSpoutMultiTasks",
"classPath" : "one_spout_multi_tasks.OneSpoutMultiTasks",
- "expectedResultRelativePath" : "OneSpoutMultiTasks.json"
+ "expectedResultRelativePath" : "one_spout_multi_tasks/OneSpoutMultiTasksResults.json"
},
{
"topologyName" : "IntegrationTest_OneSpoutTwoBolts",
"classPath" : "one_spout_two_bolts.OneSpoutTwoBolts",
- "expectedResultRelativePath" : "OneSpoutTwoBolts.json"
+ "expectedResultRelativePath" : "one_spout_two_bolts/OneSpoutTwoBoltsResults.json"
},
{
"topologyName" : "IntegrationTest_CustomSerialization",
"classPath" : "serialization.SerializationTopology",
- "expectedResultRelativePath" : "SerializationTopology.json"
+ "expectedResultRelativePath" : "serialization/SerializationTopologyResults.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_SlidingCountWindow1",
+ "classPath" : "windowing.count.SlidingCountWindowTest1",
+ "expectedResultRelativePath" : "windowing/count/SlidingCountWindowTest1Results.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_SlidingCountWindow2",
+ "classPath" : "windowing.count.SlidingCountWindowTest2",
+ "expectedResultRelativePath" : "windowing/count/SlidingCountWindowTest2Results.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_SlidingCountWindow3",
+ "classPath" : "windowing.count.SlidingCountWindowTest3",
+ "expectedResultRelativePath" : "windowing/count/SlidingCountWindowTest3Results.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_TumblingCountWindow1",
+ "classPath" : "windowing.count.TumblingCountWindowTest1",
+ "expectedResultRelativePath" : "windowing/count/TumblingCountWindowTest1Results.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_TumblingCountWindow2",
+ "classPath" : "windowing.count.TumblingCountWindowTest2",
+ "expectedResultRelativePath" : "windowing/count/TumblingCountWindowTest2Results.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_TumblingCountWindow3",
+ "classPath" : "windowing.count.TumblingCountWindowTest3",
+ "expectedResultRelativePath" : "windowing/count/TumblingCountWindowTest3Results.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_SlidingTimeWindow1",
+ "classPath" : "windowing.time.SlidingTimeWindowTest1",
+ "expectedResultRelativePath" : "windowing/time/SlidingTimeWindowTest1Results.json"
+ },
+ {
+ "topologyName" : "IntegrationTest_SlidingWatermarkEventTimeWindow1",
+ "classPath" : "windowing.watermark.SlidingWatermarkEventTimeWindowTest1",
+ "expectedResultRelativePath" : "windowing/watermark/SlidingWatermarkEventTimeWindowTest1Results.json"
}
],
"pythonTopologies": [
{
"topologyName" : "Heron_IntegrationTest_AllGrouping",
"classPath" : "-",
- "expectedResultRelativePath" : "all_grouping_result.json"
+ "expectedResultRelativePath" : "all_grouping/all_grouping_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_BasicOneTask",
"classPath" : "-",
- "expectedResultRelativePath" : "basic_one_task_result.json"
+ "expectedResultRelativePath" : "basic_one_task/basic_one_task_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_NoneGrouping",
"classPath" : "-",
- "expectedResultRelativePath" : "none_grouping_result.json"
+ "expectedResultRelativePath" : "none_grouping/none_grouping_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_OneBoltMultiTasks",
"classPath" : "-",
- "expectedResultRelativePath" : "one_bolt_multi_tasks_result.json"
+ "expectedResultRelativePath" : "one_bolt_multi_tasks/one_bolt_multi_tasks_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_OneSpoutBoltMultiTasks",
"classPath" : "-",
- "expectedResultRelativePath" : "one_spout_bolt_multi_tasks_result.json"
+ "expectedResultRelativePath" : "one_spout_bolt_multi_tasks/one_spout_bolt_multi_tasks_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_ShuffleGrouping",
"classPath" : "-",
- "expectedResultRelativePath" : "shuffle_grouping_result.json"
+ "expectedResultRelativePath" : "shuffle_grouping/shuffle_grouping_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_OneSpoutTwoBolts",
"classPath" : "-",
- "expectedResultRelativePath" : "one_spout_two_bolts_result.json"
+ "expectedResultRelativePath" : "one_spout_two_bolts/one_spout_two_bolts_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_OneSpoutTwoBolts",
"classPath" : "-",
- "expectedResultRelativePath" : "one_spout_two_bolts_result.json"
+ "expectedResultRelativePath" : "one_spout_two_bolts/one_spout_two_bolts_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_OneSpoutMultiTasks",
"classPath" : "-",
- "expectedResultRelativePath" : "one_spout_multi_tasks_result.json"
+ "expectedResultRelativePath" : "one_spout_multi_tasks/one_spout_multi_tasks_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_MultiSpoutsMultiTasks",
"classPath" : "-",
- "expectedResultRelativePath" : "multi_spouts_multi_tasks_result.json"
+ "expectedResultRelativePath" : "multi_spouts_multi_tasks/multi_spouts_multi_tasks_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_FieldsGrouping",
"classPath" : "-",
- "expectedResultRelativePath" : "fields_grouping_result.json"
+ "expectedResultRelativePath" : "fields_grouping/fields_grouping_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_BoltDoubleEmitTuples",
"classPath" : "-",
- "expectedResultRelativePath" : "bolt_double_emit_tuples_result.json"
+ "expectedResultRelativePath" : "bolt_double_emit_tuples/bolt_double_emit_tuples_result.json"
},
{
"topologyName" : "Heron_IntegrationTest_GlobalGrouping",
"classPath" : "-",
- "expectedResultRelativePath" : "global_grouping_result.json"
+ "expectedResultRelativePath" : "global_grouping/global_grouping_result.json"
}
]
}
diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD
index 18fdfbe..fafc619 100644
--- a/scripts/packages/BUILD
+++ b/scripts/packages/BUILD
@@ -311,6 +311,7 @@
srcs = [
"//integration_test/src/java:test-data-files",
],
+ strip_prefix = '/integration_test/src/java/com/twitter/heron/integration_test/topology/'
)
pkg_tar(
@@ -319,6 +320,7 @@
srcs = [
"//integration_test/src/python/integration_test/topology:test-data-files",
],
+ strip_prefix = '/integration_test/src/python/integration_test/topology/'
)
pkg_tar(