blob: 7013d685fce90098803567f0065003356671ba4b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.trident;
import java.util.concurrent.TimeUnit;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.windowing.InMemoryWindowsStore;
import org.apache.storm.trident.windowing.config.SlidingCountWindow;
import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
import org.apache.storm.trident.windowing.config.TumblingCountWindow;
import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
import org.apache.storm.trident.windowing.strategy.WindowStrategy;
import org.junit.Assert;
import org.junit.Test;
/**
*
*/
public class TridentWindowingTest {
@Test
public void testWindowStrategyInstances() throws Exception {
WindowStrategy<Object> tumblingCountStrategy = TumblingCountWindow.of(10).getWindowStrategy();
Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy);
WindowStrategy<Object> slidingCountStrategy = SlidingCountWindow.of(100, 10).getWindowStrategy();
Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy);
WindowStrategy<Object> tumblingDurationStrategy = TumblingDurationWindow.of(
new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
.getWindowStrategy();
Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy);
WindowStrategy<Object> slidingDurationStrategy = SlidingDurationWindow.of(
new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS))
.getWindowStrategy();
Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy);
}
@Test
public void testWindowConfig() {
int windowLength = 9;
TumblingCountWindow tumblingCountWindow = TumblingCountWindow.of(windowLength);
Assert.assertTrue(tumblingCountWindow.getWindowLength() == windowLength);
Assert.assertTrue(tumblingCountWindow.getSlidingLength() == windowLength);
windowLength = 10;
int slidingLength = 2;
SlidingCountWindow slidingCountWindow = SlidingCountWindow.of(10, 2);
Assert.assertTrue(slidingCountWindow.getWindowLength() == windowLength);
Assert.assertTrue(slidingCountWindow.getSlidingLength() == slidingLength);
windowLength = 20;
TumblingDurationWindow tumblingDurationWindow =
TumblingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, TimeUnit.SECONDS));
Assert.assertTrue(tumblingDurationWindow.getWindowLength() == windowLength * 1000);
Assert.assertTrue(tumblingDurationWindow.getSlidingLength() == windowLength * 1000);
windowLength = 50;
slidingLength = 10;
SlidingDurationWindow slidingDurationWindow =
SlidingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, TimeUnit.SECONDS),
new BaseWindowedBolt.Duration(slidingLength, TimeUnit.SECONDS));
Assert.assertTrue(slidingDurationWindow.getWindowLength() == windowLength * 1000);
Assert.assertTrue(slidingDurationWindow.getSlidingLength() == slidingLength * 1000);
}
@Test
public void testInMemoryWindowStore() {
InMemoryWindowsStore store = new InMemoryWindowsStore();
String keyPrefix = "key";
String valuePrefix = "valuePrefix";
int ct = 10;
for (int i = 0; i < ct; i++) {
store.put(keyPrefix + i, valuePrefix + i);
}
for (int i = 0; i < ct; i++) {
Assert.assertTrue((valuePrefix + i).equals(store.get(keyPrefix + i)));
}
store.remove(keyPrefix + 1);
Assert.assertNull(store.get(keyPrefix + 1));
}
}