blob: a7e64af4dd5b6d1690be547124f68b37cfead849 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.apex.translation.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import com.datatorrent.lib.util.KryoCloneUtils;
import java.util.Arrays;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
import org.apache.beam.runners.core.StateMerging;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaceForTest;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
/**
* Tests for {@link ApexStateInternals}. This is based on the tests for
* {@code InMemoryStateInternals}.
*/
public class ApexStateInternalsTest {
private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
private static final StateTag<CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
private static final StateTag<WatermarkHoldState>
WATERMARK_EARLIEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
private ApexStateInternals<String> underTest;
@Before
public void initStateInternals() {
underTest = new ApexStateInternals.ApexStateBackend()
.newStateInternalsFactory(StringUtf8Coder.of())
.stateInternalsForKey((String) null);
}
@Test
public void testBag() throws Exception {
BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
assertThat(value.read(), Matchers.emptyIterable());
value.add("hello");
assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
value.add("world");
assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
value.clear();
assertThat(value.read(), Matchers.emptyIterable());
assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
}
@Test
public void testBagIsEmpty() throws Exception {
BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
value.add("hello");
assertThat(readFuture.read(), Matchers.is(false));
value.clear();
assertThat(readFuture.read(), Matchers.is(true));
}
@Test
public void testMergeBagIntoSource() throws Exception {
BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
bag1.add("Hello");
bag2.add("World");
bag1.add("!");
StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
// Reading the merged bag gets both the contents
assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
assertThat(bag2.read(), Matchers.emptyIterable());
}
@Test
public void testMergeBagIntoNewNamespace() throws Exception {
BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
bag1.add("Hello");
bag2.add("World");
bag1.add("!");
StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
// Reading the merged bag gets both the contents
assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
assertThat(bag1.read(), Matchers.emptyIterable());
assertThat(bag2.read(), Matchers.emptyIterable());
}
@Test
public void testCombiningValue() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
assertThat(value.read(), Matchers.equalTo(0));
value.add(2);
assertThat(value.read(), Matchers.equalTo(2));
value.add(3);
assertThat(value.read(), Matchers.equalTo(5));
value.clear();
assertThat(value.read(), Matchers.equalTo(0));
assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
}
@Test
public void testCombiningIsEmpty() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
value.add(5);
assertThat(readFuture.read(), Matchers.is(false));
value.clear();
assertThat(readFuture.read(), Matchers.is(true));
}
@Test
public void testMergeCombiningValueIntoSource() throws Exception {
CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
value1.add(5);
value2.add(10);
value1.add(6);
assertThat(value1.read(), Matchers.equalTo(11));
assertThat(value2.read(), Matchers.equalTo(10));
// Merging clears the old values and updates the result value.
StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
assertThat(value1.read(), Matchers.equalTo(21));
assertThat(value2.read(), Matchers.equalTo(0));
}
@Test
public void testMergeCombiningValueIntoNewNamespace() throws Exception {
CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
CombiningState<Integer, int[], Integer> value3 =
underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
value1.add(5);
value2.add(10);
value1.add(6);
StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
// Merging clears the old values and updates the result value.
assertThat(value1.read(), Matchers.equalTo(0));
assertThat(value2.read(), Matchers.equalTo(0));
assertThat(value3.read(), Matchers.equalTo(21));
}
@Test
public void testWatermarkEarliestState() throws Exception {
WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
assertThat(value.read(), Matchers.nullValue());
value.add(new Instant(2000));
assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
value.add(new Instant(3000));
assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
value.add(new Instant(1000));
assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
value.clear();
assertThat(value.read(), Matchers.equalTo(null));
assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
}
@Test
public void testWatermarkLatestState() throws Exception {
WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
assertThat(value.read(), Matchers.nullValue());
value.add(new Instant(2000));
assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
value.add(new Instant(3000));
assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
value.add(new Instant(1000));
assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
value.clear();
assertThat(value.read(), Matchers.equalTo(null));
assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
}
@Test
public void testWatermarkEndOfWindowState() throws Exception {
WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
assertThat(value.read(), Matchers.nullValue());
value.add(new Instant(2000));
assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
value.clear();
assertThat(value.read(), Matchers.equalTo(null));
assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
}
@Test
public void testWatermarkStateIsEmpty() throws Exception {
WatermarkHoldState value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
value.add(new Instant(1000));
assertThat(readFuture.read(), Matchers.is(false));
value.clear();
assertThat(readFuture.read(), Matchers.is(true));
}
@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
value1.add(new Instant(3000));
value2.add(new Instant(5000));
value1.add(new Instant(4000));
value2.add(new Instant(2000));
// Merging clears the old values and updates the merged value.
StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
assertThat(value2.read(), Matchers.equalTo(null));
}
@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
WatermarkHoldState value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
WatermarkHoldState value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
WatermarkHoldState value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
value1.add(new Instant(3000));
value2.add(new Instant(5000));
value1.add(new Instant(4000));
value2.add(new Instant(2000));
// Merging clears the old values and updates the result value.
StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
// Merging clears the old values and updates the result value.
assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
assertThat(value1.read(), Matchers.equalTo(null));
assertThat(value2.read(), Matchers.equalTo(null));
}
@Test
public void testSerialization() throws Exception {
ApexStateInternalsFactory<String> sif = new ApexStateBackend().
newStateInternalsFactory(StringUtf8Coder.of());
ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
value.write("hello");
ApexStateInternalsFactory<String> cloned;
assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
assertThat(clonedValue.read(), Matchers.equalTo("hello"));
assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
}
}