blob: feaab42e35f31e9cef6d6177e02f9788fad3a253 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.gearpump.translators;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import io.gearpump.streaming.dsl.window.api.WindowFunction;
import io.gearpump.streaming.dsl.window.impl.Window;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator.GearpumpWindowFn;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** Tests for {@link GroupByKeyTranslator}. */
@RunWith(Parameterized.class)
public class GroupByKeyTranslatorTest {
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testGearpumpWindowFn() {
GearpumpWindowFn windowFn = new GearpumpWindowFn(true);
List<BoundedWindow> windows =
Lists.newArrayList(
new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10)),
new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(15)));
WindowFunction.Context<WindowedValue<String>> context =
new WindowFunction.Context<WindowedValue<String>>() {
@Override
public Instant timestamp() {
return Instant.EPOCH;
}
@Override
public WindowedValue<String> element() {
return WindowedValue.of(
"v1", new org.joda.time.Instant(6), windows, PaneInfo.NO_FIRING);
}
};
Window[] result = windowFn.apply(context);
List<Window> expected = Lists.newArrayList();
for (BoundedWindow w : windows) {
expected.add(TranslatorUtils.boundedWindowToGearpumpWindow(w));
}
assertThat(result, equalTo(expected.toArray()));
}
@Parameterized.Parameters(name = "{index}: {0}")
public static Iterable<TimestampCombiner> data() {
return ImmutableList.of(
TimestampCombiner.EARLIEST, TimestampCombiner.LATEST, TimestampCombiner.END_OF_WINDOW);
}
@Parameterized.Parameter(0)
public TimestampCombiner timestampCombiner;
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testKeyedByTimestamp() {
WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
BoundedWindow window =
new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10));
GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp =
new GroupByKeyTranslator.KeyedByTimestamp(slidingWindows, timestampCombiner);
WindowedValue<KV<String, String>> value =
WindowedValue.of(
KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING);
KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result =
keyedByTimestamp.map(value);
org.joda.time.Instant time =
timestampCombiner.assign(
window, slidingWindows.getOutputTime(value.getTimestamp(), window));
assertThat(result, equalTo(KV.of(time, value)));
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testMerge() {
WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
GroupByKeyTranslator.Merge merge =
new GroupByKeyTranslator.Merge(slidingWindows, timestampCombiner);
org.joda.time.Instant key1 = new org.joda.time.Instant(5);
WindowedValue<KV<String, String>> value1 =
WindowedValue.of(
KV.of("key1", "value1"),
key1,
new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(10)),
PaneInfo.NO_FIRING);
org.joda.time.Instant key2 = new org.joda.time.Instant(10);
WindowedValue<KV<String, String>> value2 =
WindowedValue.of(
KV.of("key2", "value2"),
key2,
new IntervalWindow(new org.joda.time.Instant(9), new org.joda.time.Instant(14)),
PaneInfo.NO_FIRING);
KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result1 =
merge.fold(
KV.<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>>of(null, null),
KV.of(key1, value1));
assertThat(result1.getKey(), equalTo(key1));
assertThat(result1.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1")));
KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result2 =
merge.fold(result1, KV.of(key2, value2));
assertThat(result2.getKey(), equalTo(timestampCombiner.combine(key1, key2)));
Collection<? extends BoundedWindow> resultWindows = result2.getValue().getWindows();
assertThat(resultWindows.size(), equalTo(1));
IntervalWindow expectedWindow =
new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(14));
assertThat(resultWindows.toArray()[0], equalTo(expectedWindow));
assertThat(
result2.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1", "value2")));
}
}