| // Copyright 2017 Twitter. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.twitter.heron.integration_test.topology.windowing; |
| |
| import java.net.MalformedURLException; |
| import java.time.Duration; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.logging.Logger; |
| |
| import org.json.simple.JSONArray; |
| import org.json.simple.JSONObject; |
| |
| import com.twitter.heron.api.bolt.BaseWindowedBolt; |
| import com.twitter.heron.api.bolt.OutputCollector; |
| import com.twitter.heron.api.spout.BaseRichSpout; |
| import com.twitter.heron.api.spout.SpoutOutputCollector; |
| import com.twitter.heron.api.topology.OutputFieldsDeclarer; |
| import com.twitter.heron.api.topology.TopologyContext; |
| import com.twitter.heron.api.tuple.Fields; |
| import com.twitter.heron.api.tuple.Tuple; |
| import com.twitter.heron.api.tuple.Values; |
| import com.twitter.heron.api.utils.Utils; |
| import com.twitter.heron.api.windowing.TupleWindow; |
| import com.twitter.heron.integration_test.common.AbstractTestTopology; |
| import com.twitter.heron.integration_test.core.TestTopologyBuilder; |
| |
| public class WindowTestBase extends AbstractTestTopology { |
| |
| private static final String NUMBER_FIELD = "number"; |
| private static final String STRING_FIELD = "numAsStr"; |
| private static final String EVENT_TIME_FIELD = "eventTimeField"; |
| private static final String DUMMY_FIELD = "dummy"; |
| |
| private BaseWindowedBolt.Count windowCountLength; |
| private BaseWindowedBolt.Count slideCountInterval; |
| private Duration windowDurationLength; |
| private Duration slideDurationInterval; |
| private Long sleepBetweenTuples; |
| private boolean useEventTime = false; |
| private Duration watermarkInterval; |
| |
| |
| public String getBoltName() { |
| return "VerificationBolt"; |
| } |
| |
| public String getSpoutName() { |
| return "IncrementingSpout"; |
| } |
| |
| |
| public WindowTestBase(String[] args) throws MalformedURLException { |
| super(args); |
| } |
| |
| public WindowTestBase withWindowLength(Duration duration) { |
| this.windowDurationLength = duration; |
| return this; |
| } |
| |
| public WindowTestBase withWindowLength(BaseWindowedBolt.Count count) { |
| this.windowCountLength = count; |
| return this; |
| } |
| |
| public WindowTestBase withSlidingInterval(Duration duration) { |
| this.slideDurationInterval = duration; |
| return this; |
| } |
| |
| public WindowTestBase withSlidingInterval(BaseWindowedBolt.Count count) { |
| this.slideCountInterval = count; |
| return this; |
| } |
| |
| public WindowTestBase withSleepBetweenTuples(long millis) { |
| this.sleepBetweenTuples = millis; |
| return this; |
| } |
| |
| public WindowTestBase useEventTime() { |
| this.useEventTime = true; |
| return this; |
| } |
| |
| @SuppressWarnings("HiddenField") |
| public WindowTestBase withWatermarkInterval(Duration watermarkInterval) { |
| this.watermarkInterval = watermarkInterval; |
| return this; |
| } |
| |
| @Override |
| protected TestTopologyBuilder buildTopology(TestTopologyBuilder builder) { |
| builder.setSpout(getSpoutName(), new IncrementingSpout() |
| .withSleepBetweenTuples(sleepBetweenTuples), 1); |
| |
| BaseWindowedBolt bolt = null; |
| if (this.windowCountLength != null) { |
| if (this.slideCountInterval != null) { |
| bolt = new VerificationBolt() |
| .withWindow(this.windowCountLength, this.slideCountInterval); |
| } else { |
| bolt = new VerificationBolt() |
| .withTumblingWindow(this.windowCountLength); |
| } |
| } |
| |
| if (this.windowDurationLength != null) { |
| if (this.slideDurationInterval != null) { |
| bolt = new VerificationBolt() |
| .withWindow(this.windowDurationLength, this.slideDurationInterval); |
| } else { |
| bolt = new VerificationBolt() |
| .withTumblingWindow(this.windowDurationLength); |
| } |
| } |
| |
| if (this.useEventTime) { |
| bolt.withTimestampField(EVENT_TIME_FIELD); |
| } |
| |
| if (this.watermarkInterval != null) { |
| bolt.withWatermarkInterval(this.watermarkInterval); |
| } |
| builder.setBolt(getBoltName(), bolt, 1, false) |
| .shuffleGrouping(getSpoutName()); |
| return builder; |
| } |
| |
| public static class IncrementingSpout extends BaseRichSpout { |
| private static final Logger LOG = Logger.getLogger(IncrementingSpout.class.getName()); |
| private static final long serialVersionUID = -6171170228097868632L; |
| private SpoutOutputCollector collector; |
| private static int currentNum; |
| private static Random rng = new Random(); |
| private String componentId; |
| private Long sleepBetweenTuples = null; |
| // In millis |
| private long currentTime = 1504573536000L; |
| |
| public IncrementingSpout withSleepBetweenTuples(long millis) { |
| this.sleepBetweenTuples = millis; |
| return this; |
| } |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| declarer.declare(new Fields(NUMBER_FIELD, STRING_FIELD, EVENT_TIME_FIELD)); |
| } |
| |
| @Override |
| @SuppressWarnings("HiddenField") |
| public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector |
| collector) { |
| componentId = context.getThisComponentId(); |
| this.collector = collector; |
| } |
| |
| @Override |
| public void nextTuple() { |
| if (sleepBetweenTuples == null) { |
| Utils.sleep(rng.nextInt(10)); |
| } else { |
| Utils.sleep(sleepBetweenTuples); |
| } |
| currentNum++; |
| final String numAsStr = "str(" + currentNum + ")str"; |
| final Values tuple = new Values(currentNum, numAsStr, currentTime += 1000); |
| LOG.info("Time = " + System.currentTimeMillis() |
| + " Component = " + componentId + " Tuple = " + tuple.toString()); |
| |
| collector.emit(tuple, currentNum); |
| } |
| |
| @Override |
| public void ack(Object msgId) { |
| LOG.info("Received ACK for msgId : " + msgId); |
| } |
| |
| @Override |
| public void fail(Object msgId) { |
| LOG.info("Received FAIL for msgId : " + msgId); |
| } |
| } |
| |
| public static class VerificationBolt extends BaseWindowedBolt { |
| private static final long serialVersionUID = -6067634845003700125L; |
| private OutputCollector collector; |
| private String componentId; |
| |
| private static int windowCount = 0; |
| |
| |
| private static final Logger LOG = Logger.getLogger(VerificationBolt.class.getName()); |
| |
| |
| @Override |
| @SuppressWarnings("HiddenField") |
| public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector |
| collector) { |
| componentId = context.getThisComponentId(); |
| this.collector = collector; |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public void execute(TupleWindow inputWindow) { |
| windowCount++; |
| List<Tuple> tuplesInWindow = inputWindow.get(); |
| List<Tuple> newTuples = inputWindow.getNew(); |
| List<Tuple> expiredTuples = inputWindow.getExpired(); |
| LOG.info("tuplesInWindow.size() = " + tuplesInWindow.size()); |
| LOG.info("newTuples.size() = " + newTuples.size()); |
| LOG.info("expiredTuples.size() = " + expiredTuples.size()); |
| |
| JSONObject jsonObject = new JSONObject(); |
| JSONArray jsonArray = new JSONArray(); |
| JSONObject tmp = new JSONObject(); |
| tmp.put("tuplesInWindow", tuplesToListString(tuplesInWindow)); |
| jsonArray.add(tmp); |
| tmp = new JSONObject(); |
| tmp.put("newTuples", tuplesToListString(newTuples)); |
| jsonArray.add(tmp); |
| tmp = new JSONObject(); |
| tmp.put("expiredTuples", tuplesToListString(expiredTuples)); |
| jsonArray.add(tmp); |
| jsonObject.put(windowCount, jsonArray); |
| LOG.info("Component = " + componentId + " Window Count = " + windowCount |
| + " tuplesInWindow = " + jsonArray.toJSONString()); |
| |
| collector.emit(new Values(jsonObject.toJSONString())); |
| } |
| |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| declarer.declare(new Fields(DUMMY_FIELD)); |
| } |
| } |
| |
| public static List<String> tuplesToListString(List<Tuple> tuples) { |
| List<String> tmp = new LinkedList<>(); |
| for (Tuple tuple : tuples) { |
| tmp.add(tuple.getValue(0).toString()); |
| } |
| return tmp; |
| } |
| } |