blob: f867aac67018af2d918aba2311370acdf6e0495e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link StateNamespaces}. */
@RunWith(JUnit4.class)
public class StateNamespacesTest {
private final Coder<IntervalWindow> intervalCoder = IntervalWindow.getCoder();
private IntervalWindow intervalWindow(long start, long end) {
return new IntervalWindow(new Instant(start), new Instant(end));
}
/**
* This test should not be changed. It verifies that the stringKey matches certain expectations.
* If this changes, the ability to reload any pipeline that has persisted these namespaces will be
* impacted.
*/
@Test
public void testStability() {
StateNamespace global = StateNamespaces.global();
StateNamespace intervalWindow =
StateNamespaces.window(intervalCoder, intervalWindow(1000, 87392));
StateNamespace intervalWindowAndTrigger =
StateNamespaces.windowAndTrigger(intervalCoder, intervalWindow(1000, 87392), 57);
StateNamespace globalWindow =
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
StateNamespace globalWindowAndTrigger =
StateNamespaces.windowAndTrigger(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, 12);
assertEquals("/", global.stringKey());
assertEquals("/gAAAAAABVWD4ogU/", intervalWindow.stringKey());
assertEquals("/gAAAAAABVWD4ogU/1L/", intervalWindowAndTrigger.stringKey());
assertEquals("//", globalWindow.stringKey());
assertEquals("//C/", globalWindowAndTrigger.stringKey());
}
/** Test that WindowAndTrigger namespaces are prefixed by the related Window namespace. */
@Test
public void testIntervalWindowPrefixing() {
StateNamespace window = StateNamespaces.window(intervalCoder, intervalWindow(1000, 87392));
StateNamespace windowAndTrigger =
StateNamespaces.windowAndTrigger(intervalCoder, intervalWindow(1000, 87392), 57);
assertThat(windowAndTrigger.stringKey(), Matchers.startsWith(window.stringKey()));
assertThat(
StateNamespaces.global().stringKey(),
Matchers.not(Matchers.startsWith(window.stringKey())));
}
/** Test that WindowAndTrigger namespaces are prefixed by the related Window namespace. */
@Test
public void testGlobalWindowPrefixing() {
StateNamespace window =
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
StateNamespace windowAndTrigger =
StateNamespaces.windowAndTrigger(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, 57);
assertThat(windowAndTrigger.stringKey(), Matchers.startsWith(window.stringKey()));
assertThat(
StateNamespaces.global().stringKey(),
Matchers.not(Matchers.startsWith(window.stringKey())));
}
@Test
public void testFromStringGlobal() {
assertStringKeyRoundTrips(intervalCoder, StateNamespaces.global());
}
@Test
public void testFromStringIntervalWindow() {
assertStringKeyRoundTrips(
intervalCoder, StateNamespaces.window(intervalCoder, intervalWindow(1000, 8000)));
assertStringKeyRoundTrips(
intervalCoder, StateNamespaces.window(intervalCoder, intervalWindow(1000, 8000)));
assertStringKeyRoundTrips(
intervalCoder,
StateNamespaces.windowAndTrigger(intervalCoder, intervalWindow(1000, 8000), 18));
assertStringKeyRoundTrips(
intervalCoder,
StateNamespaces.windowAndTrigger(intervalCoder, intervalWindow(1000, 8000), 19));
assertStringKeyRoundTrips(
intervalCoder,
StateNamespaces.windowAndTrigger(intervalCoder, intervalWindow(2000, 8000), 19));
}
@Test
public void testFromStringGlobalWindow() {
assertStringKeyRoundTrips(GlobalWindow.Coder.INSTANCE, StateNamespaces.global());
assertStringKeyRoundTrips(
GlobalWindow.Coder.INSTANCE,
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE));
assertStringKeyRoundTrips(
GlobalWindow.Coder.INSTANCE,
StateNamespaces.windowAndTrigger(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, 18));
}
private void assertStringKeyRoundTrips(
Coder<? extends BoundedWindow> coder, StateNamespace namespace) {
assertEquals(namespace, StateNamespaces.fromString(namespace.stringKey(), coder));
}
}