blob: f70850086a444bafeee198dd198ab7437598cb28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.worker.DataflowMatchers.ByteStringMatcher.byteStringEq;
import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaceForTest;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagBag;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/** Tests for {@link WindmillStateInternals}. */
@RunWith(JUnit4.class)
public class WindmillStateInternalsTest {
private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns");
private static final String STATE_FAMILY = "family";
private static final StateTag<CombiningState<Integer, int[], Integer>> COMBINING_ADDR =
StateTags.combiningValueFromInputInternal("combining", VarIntCoder.of(), Sum.ofIntegers());
private static final ByteString COMBINING_KEY = key(NAMESPACE, "combining");
private final Coder<int[]> accumCoder =
Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of());
private long workToken = 0;
@Mock private WindmillStateReader mockReader;
private WindmillStateInternals<String> underTest;
private WindmillStateInternals<String> underTestNewKey;
private WindmillStateCache cache;
@Mock private Supplier<Closeable> readStateSupplier;
private static ByteString key(StateNamespace namespace, String addrId) {
return ByteString.copyFromUtf8(namespace.stringKey() + "+u" + addrId);
}
private static ByteString systemKey(StateNamespace namespace, String addrId) {
return ByteString.copyFromUtf8(namespace.stringKey() + "+s" + addrId);
}
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
cache = new WindmillStateCache();
resetUnderTest();
}
public void resetUnderTest() {
workToken++;
underTest =
new WindmillStateInternals<>(
"dummyKey",
STATE_FAMILY,
mockReader,
false,
cache.forComputation("comp").forKey(ByteString.EMPTY, STATE_FAMILY, 17L, workToken),
readStateSupplier);
underTestNewKey =
new WindmillStateInternals<String>(
"dummyNewKey",
STATE_FAMILY,
mockReader,
true,
cache.forComputation("comp").forKey(ByteString.EMPTY, STATE_FAMILY, 17L, workToken),
readStateSupplier);
}
@After
public void tearDown() throws Exception {
// Make sure no WindmillStateReader (a per-WorkItem object) escapes into the cache
// (a global object).
WindmillStateTestUtils.assertNoReference(cache, WindmillStateReader.class);
}
private <T> void waitAndSet(final SettableFuture<T> future, final T value, final long millis) {
new Thread(
() -> {
try {
sleepMillis(millis);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted before setting", e);
}
future.set(value);
})
.run();
}
private WindmillStateReader.WeightedList<String> weightedList(String... elems) {
WindmillStateReader.WeightedList<String> result =
new WindmillStateReader.WeightedList<>(new ArrayList<String>(elems.length));
for (String elem : elems) {
result.addWeighted(elem, elem.length());
}
return result;
}
@Test
public void testBagAddBeforeRead() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
SettableFuture<Iterable<String>> future = SettableFuture.create();
when(mockReader.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
bag.readLater();
bag.add("hello");
waitAndSet(future, Arrays.asList("world"), 200);
assertThat(bag.read(), Matchers.containsInAnyOrder("hello", "world"));
bag.add("goodbye");
assertThat(bag.read(), Matchers.containsInAnyOrder("hello", "world", "goodbye"));
}
@Test
public void testBagClearBeforeRead() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.clear();
bag.add("hello");
assertThat(bag.read(), Matchers.containsInAnyOrder("hello"));
// Shouldn't need to read from windmill for this.
Mockito.verifyZeroInteractions(mockReader);
}
@Test
public void testBagIsEmptyFalse() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
SettableFuture<Iterable<String>> future = SettableFuture.create();
when(mockReader.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
ReadableState<Boolean> result = bag.isEmpty().readLater();
Mockito.verify(mockReader).bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of());
waitAndSet(future, Arrays.asList("world"), 200);
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testBagIsEmptyTrue() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
SettableFuture<Iterable<String>> future = SettableFuture.create();
when(mockReader.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
ReadableState<Boolean> result = bag.isEmpty().readLater();
Mockito.verify(mockReader).bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of());
waitAndSet(future, Arrays.<String>asList(), 200);
assertThat(result.read(), Matchers.is(true));
}
@Test
public void testBagIsEmptyAfterClear() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.clear();
ReadableState<Boolean> result = bag.isEmpty();
Mockito.verify(mockReader, never())
.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of());
assertThat(result.read(), Matchers.is(true));
bag.add("hello");
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testBagAddPersist() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.add("hello");
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getBagUpdatesCount());
TagBag bagUpdates = commitBuilder.getBagUpdates(0);
assertEquals(key(NAMESPACE, "bag"), bagUpdates.getTag());
assertEquals(1, bagUpdates.getValuesCount());
assertEquals("hello", bagUpdates.getValues(0).toStringUtf8());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testBagClearPersist() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.add("hello");
bag.clear();
bag.add("world");
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getBagUpdatesCount());
TagBag tagBag = commitBuilder.getBagUpdates(0);
assertEquals(key(NAMESPACE, "bag"), tagBag.getTag());
assertEquals(STATE_FAMILY, tagBag.getStateFamily());
assertTrue(tagBag.getDeleteAll());
assertEquals(1, tagBag.getValuesCount());
assertEquals("world", tagBag.getValues(0).toStringUtf8());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testBagPersistEmpty() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.clear();
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
// 1 bag update = the clear
assertEquals(1, commitBuilder.getBagUpdatesCount());
}
@Test
public void testNewBagNoFetch() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTestNewKey.state(NAMESPACE, addr);
assertThat(bag.read(), Matchers.emptyIterable());
// Shouldn't need to read from windmill for this.
Mockito.verifyZeroInteractions(mockReader);
}
@Test
@SuppressWarnings("ArraysAsListPrimitiveArray")
public void testCombiningAddBeforeRead() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
SettableFuture<Iterable<int[]>> future = SettableFuture.create();
when(mockReader.bagFuture(eq(COMBINING_KEY), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any()))
.thenReturn(future);
value.readLater();
value.add(5);
value.add(6);
waitAndSet(future, Arrays.asList(new int[] {8}, new int[] {10}), 200);
assertThat(value.read(), Matchers.equalTo(29));
// That get "compressed" the combiner. So, the underlying future should change:
future.set(Arrays.asList(new int[] {29}));
value.add(2);
assertThat(value.read(), Matchers.equalTo(31));
}
@Test
public void testCombiningClearBeforeRead() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.clear();
value.readLater();
value.add(5);
value.add(6);
assertThat(value.read(), Matchers.equalTo(11));
value.add(2);
assertThat(value.read(), Matchers.equalTo(13));
// Shouldn't need to read from windmill for this because we immediately cleared..
Mockito.verifyZeroInteractions(mockReader);
}
@Test
@SuppressWarnings("ArraysAsListPrimitiveArray")
public void testCombiningIsEmpty() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
SettableFuture<Iterable<int[]>> future = SettableFuture.create();
when(mockReader.bagFuture(eq(COMBINING_KEY), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any()))
.thenReturn(future);
ReadableState<Boolean> result = value.isEmpty().readLater();
ArgumentCaptor<ByteString> byteString = ArgumentCaptor.forClass(ByteString.class);
// Note that we do expect the third argument - the coder - to be equal to accumCoder, but that
// is possibly overspecified and currently trips an issue in the SDK where identical coders are
// not #equals().
//
// What matters is that a future is created, hence a Windmill RPC sent.
Mockito.verify(mockReader)
.bagFuture(byteString.capture(), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any());
assertThat(byteString.getValue(), byteStringEq(COMBINING_KEY));
waitAndSet(future, Arrays.asList(new int[] {29}), 200);
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testCombiningIsEmptyAfterClear() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.clear();
ReadableState<Boolean> result = value.isEmpty();
Mockito.verify(mockReader, never()).bagFuture(COMBINING_KEY, STATE_FAMILY, accumCoder);
assertThat(result.read(), Matchers.is(true));
value.add(87);
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testCombiningAddPersist() throws Exception {
disableCompactOnWrite();
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.add(5);
value.add(6);
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getBagUpdatesCount());
TagBag bagUpdates = commitBuilder.getBagUpdates(0);
assertEquals(COMBINING_KEY, bagUpdates.getTag());
assertEquals(1, bagUpdates.getValuesCount());
assertEquals(
11, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]);
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testCombiningAddPersistWithCompact() throws Exception {
forceCompactOnWrite();
Mockito.when(
mockReader.bagFuture(
org.mockito.Matchers.<ByteString>any(),
org.mockito.Matchers.<String>any(),
org.mockito.Matchers.<Coder<int[]>>any()))
.thenReturn(
Futures.<Iterable<int[]>>immediateFuture(
ImmutableList.of(new int[] {40}, new int[] {60})));
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.add(5);
value.add(6);
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getBagUpdatesCount());
TagBag bagUpdates = commitBuilder.getBagUpdates(0);
assertEquals(COMBINING_KEY, bagUpdates.getTag());
assertEquals(1, bagUpdates.getValuesCount());
assertTrue(bagUpdates.getDeleteAll());
assertEquals(
111, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]);
}
@Test
public void testCombiningClearPersist() throws Exception {
disableCompactOnWrite();
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.clear();
value.add(5);
value.add(6);
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getBagUpdatesCount());
TagBag tagBag = commitBuilder.getBagUpdates(0);
assertEquals(COMBINING_KEY, tagBag.getTag());
assertEquals(STATE_FAMILY, tagBag.getStateFamily());
assertTrue(tagBag.getDeleteAll());
assertEquals(1, tagBag.getValuesCount());
assertEquals(
11, CoderUtils.decodeFromByteArray(accumCoder, tagBag.getValues(0).toByteArray())[0]);
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testNewCombiningNoFetch() throws Exception {
GroupingState<Integer, Integer> value = underTestNewKey.state(NAMESPACE, COMBINING_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
assertThat(value.read(), Matchers.is(Sum.ofIntegers().identity()));
assertThat(value.isEmpty().read(), Matchers.is(false));
// Shouldn't need to read from windmill for this.
Mockito.verifyZeroInteractions(mockReader);
}
@Test
public void testWatermarkAddBeforeReadEarliest() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
WatermarkHoldState bag = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
bag.readLater();
bag.add(new Instant(3000));
waitAndSet(future, new Instant(2000), 200);
assertThat(bag.read(), Matchers.equalTo(new Instant(2000)));
Mockito.verify(mockReader, times(2)).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
// Adding another value doesn't create another future, but does update the result.
bag.add(new Instant(1000));
assertThat(bag.read(), Matchers.equalTo(new Instant(1000)));
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkAddBeforeReadLatest() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
WatermarkHoldState bag = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
// Suggesting we will read it later should get a future from the underlying WindmillStateReader
bag.readLater();
// Actually reading it will request another future, and get the same one, from
// WindmillStateReader
bag.add(new Instant(3000));
waitAndSet(future, new Instant(2000), 200);
assertThat(bag.read(), Matchers.equalTo(new Instant(3000)));
Mockito.verify(mockReader, times(2)).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
// Adding another value doesn't create another future, but does update the result.
bag.add(new Instant(3000));
assertThat(bag.read(), Matchers.equalTo(new Instant(3000)));
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkAddBeforeReadEndOfWindow() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
WatermarkHoldState bag = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
// Requests a future once
bag.readLater();
bag.add(new Instant(3000));
waitAndSet(future, new Instant(3000), 200);
// read() requests a future again, receiving the same one
assertThat(bag.read(), Matchers.equalTo(new Instant(3000)));
Mockito.verify(mockReader, times(2)).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
// Adding another value doesn't create another future, but does update the result.
bag.add(new Instant(3000));
assertThat(bag.read(), Matchers.equalTo(new Instant(3000)));
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkClearBeforeRead() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
WatermarkHoldState bag = underTest.state(NAMESPACE, addr);
bag.clear();
assertThat(bag.read(), Matchers.nullValue());
bag.add(new Instant(300));
assertThat(bag.read(), Matchers.equalTo(new Instant(300)));
// Shouldn't need to read from windmill because the value is already available.
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistEarliest() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
WatermarkHoldState bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(1000));
bag.add(new Instant(2000));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0));
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistLatestEmpty() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
hold.add(new Instant(1000));
hold.add(new Instant(2000));
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY))
.thenReturn(Futures.<Instant>immediateFuture(null));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0));
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistLatestWindmillWins() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
hold.add(new Instant(1000));
hold.add(new Instant(2000));
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY))
.thenReturn(Futures.<Instant>immediateFuture(new Instant(4000)));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(4000), watermarkHold.getTimestamps(0));
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistLatestLocalAdditionsWin() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
hold.add(new Instant(1000));
hold.add(new Instant(2000));
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY))
.thenReturn(Futures.<Instant>immediateFuture(new Instant(500)));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0));
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistEndOfWindow() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
hold.add(new Instant(2000));
hold.add(new Instant(2000));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0));
// Blind adds should not need to read the future.
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkClearPersist() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
hold.add(new Instant(500));
hold.clear();
hold.add(new Instant(1000));
hold.add(new Instant(2000));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold clearAndUpdate = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), clearAndUpdate.getTag());
assertEquals(1, clearAndUpdate.getTimestampsCount());
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), clearAndUpdate.getTimestamps(0));
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistEmpty() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
WatermarkHoldState bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(500));
bag.clear();
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
// 1 bag update corresponds to deletion. There shouldn't be a bag update adding items.
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
}
@Test
public void testNewWatermarkNoFetch() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
WatermarkHoldState bag = underTestNewKey.state(NAMESPACE, addr);
assertThat(bag.read(), Matchers.nullValue());
// Shouldn't need to read from windmill for this.
Mockito.verifyZeroInteractions(mockReader);
}
@Test
public void testValueSetBeforeRead() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
value.write("Hello");
assertEquals("Hello", value.read());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueClearBeforeRead() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
value.clear();
assertEquals(null, value.read());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueRead() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
SettableFuture<String> future = SettableFuture.create();
when(mockReader.valueFuture(key(NAMESPACE, "value"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
waitAndSet(future, "World", 200);
assertEquals("World", value.read());
}
@Test
public void testValueSetPersist() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
value.write("Hi");
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getValueUpdatesCount());
TagValue valueUpdate = commitBuilder.getValueUpdates(0);
assertEquals(key(NAMESPACE, "value"), valueUpdate.getTag());
assertEquals("Hi", valueUpdate.getValue().getData().toStringUtf8());
assertTrue(valueUpdate.isInitialized());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueClearPersist() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
value.write("Hi");
value.clear();
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getValueUpdatesCount());
TagValue valueUpdate = commitBuilder.getValueUpdates(0);
assertEquals(key(NAMESPACE, "value"), valueUpdate.getTag());
assertEquals(0, valueUpdate.getValue().getData().size());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueNoChangePersist() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
underTest.state(NAMESPACE, addr);
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(0, commitBuilder.getValueUpdatesCount());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testNewValueNoFetch() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTestNewKey.state(NAMESPACE, addr);
assertEquals(null, value.read());
// Shouldn't need to read from windmill for this.
Mockito.verifyZeroInteractions(mockReader);
}
@Test
public void testCachedValue() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
assertEquals(0, cache.getWeight());
value.write("Hi");
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(126, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, addr);
assertEquals("Hi", value.read());
value.clear();
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(124, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, addr);
assertEquals(null, value.read());
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testCachedBag() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
assertEquals(0, cache.getWeight());
SettableFuture<Iterable<String>> future = SettableFuture.create();
when(mockReader.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
bag.readLater();
assertEquals(0, cache.getWeight());
bag.add("hello");
waitAndSet(future, weightedList("world"), 200);
Iterable<String> readResult1 = bag.read();
assertThat(readResult1, Matchers.containsInAnyOrder("hello", "world"));
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(134, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
bag.add("goodbye");
// Make sure that cached iterables have not changed after persist+add.
assertThat(readResult1, Matchers.containsInAnyOrder("hello", "world"));
Iterable<String> readResult2 = bag.read();
assertThat(readResult2, Matchers.containsInAnyOrder("hello", "world", "goodbye"));
bag.clear();
// Make sure that cached iterables have not changed after clear.
assertThat(readResult2, Matchers.containsInAnyOrder("hello", "world", "goodbye"));
bag.add("new");
// Make sure that cached iterables have not changed after clear+add.
assertThat(readResult2, Matchers.containsInAnyOrder("hello", "world", "goodbye"));
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(127, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
bag.add("new2");
assertThat(bag.read(), Matchers.containsInAnyOrder("new", "new2"));
bag.clear();
bag.add("new3");
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(128, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
assertThat(bag.read(), Matchers.containsInAnyOrder("new3"));
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
Mockito.verify(mockReader, times(2))
.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testCachedWatermarkHold() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
assertEquals(0, cache.getWeight());
hold.readLater();
hold.add(new Instant(3000));
waitAndSet(future, new Instant(2000), 200);
assertThat(hold.read(), Matchers.equalTo(new Instant(2000)));
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(132, cache.getWeight());
resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
assertThat(hold.read(), Matchers.equalTo(new Instant(2000)));
hold.clear();
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(132, cache.getWeight());
resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
assertEquals(null, hold.read());
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
Mockito.verify(mockReader, times(2)).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
@SuppressWarnings("ArraysAsListPrimitiveArray")
public void testCachedCombining() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
SettableFuture<Iterable<int[]>> future = SettableFuture.create();
when(mockReader.bagFuture(
eq(key(NAMESPACE, "combining")), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any()))
.thenReturn(future);
assertEquals(0, cache.getWeight());
value.readLater();
value.add(1);
waitAndSet(future, Arrays.asList(new int[] {2}), 200);
assertThat(value.read(), Matchers.equalTo(3));
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(125, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);
assertThat(value.read(), Matchers.equalTo(3));
value.add(3);
assertThat(value.read(), Matchers.equalTo(6));
value.clear();
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
assertEquals(124, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);
assertThat(value.read(), Matchers.equalTo(0));
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
// Note that we do expect the third argument - the coder - to be equal to accumCoder, but that
// is possibly overspecified and currently trips an issue in the SDK where identical coders are
// not #equals().
//
// What matters is the number of futures created, hence Windmill RPCs.
Mockito.verify(mockReader, times(2))
.bagFuture(eq(key(NAMESPACE, "combining")), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any());
Mockito.verifyNoMoreInteractions(mockReader);
}
private void disableCompactOnWrite() {
WindmillStateInternals.COMPACT_NOW.set(() -> false);
}
private void forceCompactOnWrite() {
WindmillStateInternals.COMPACT_NOW.set(() -> true);
}
}