blob: a63d9a0417ceaffc62066ce70d1fe1f7b4bc4483 [file] [log] [blame]
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners.worker;
import static com.google.cloud.dataflow.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill.TagList;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill.TagValue;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.util.state.BagState;
import com.google.cloud.dataflow.sdk.util.state.CombiningValueState;
import com.google.cloud.dataflow.sdk.util.state.StateContents;
import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaceForTest;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.StateTags;
import com.google.cloud.dataflow.sdk.util.state.ValueState;
import com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* Tests for {@link WindmillStateInternals}.
*/
@RunWith(JUnit4.class)
public class WindmillStateInternalsTest {
private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns");
private static final String STATE_FAMILY = "family";
private static final StateTag<CombiningValueState<Integer, Integer>> COMBINING_ADDR =
StateTags.combiningValueFromInputInternal(
"combining", VarIntCoder.of(), new Sum.SumIntegerFn());
private static final ByteString COMBINING_KEY = key(NAMESPACE, "combining");
private final Coder<int[]> accumCoder =
new Sum.SumIntegerFn().getAccumulatorCoder(null, VarIntCoder.of());
@Mock
private WindmillStateReader mockReader;
private WindmillStateInternals underTest;
@Mock
private Supplier<StateSampler.ScopedState> readStateSupplier;
private static ByteString key(StateNamespace namespace, String addrId) {
return key("", namespace, addrId);
}
private static ByteString key(String prefix, StateNamespace namespace, String addrId) {
return ByteString.copyFromUtf8(prefix + namespace.stringKey() + "+u" + addrId);
}
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
underTest = new WindmillStateInternals(STATE_FAMILY, true, mockReader, readStateSupplier);
}
private <T> void waitAndSet(final SettableFuture<T> future, final T value, final long millis) {
new Thread(new Runnable() {
@Override
public void run() {
try {
sleepMillis(millis);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted before setting", e);
}
future.set(value);
}
}).run();
}
@Test
public void testBagAddBeforeRead() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
SettableFuture<Iterable<String>> future = SettableFuture.create();
when(mockReader.listFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
StateContents<Iterable<String>> result = bag.get();
bag.add("hello");
waitAndSet(future, Arrays.asList("world"), 200);
assertThat(result.read(), Matchers.containsInAnyOrder("hello", "world"));
bag.add("goodbye");
assertThat(result.read(), Matchers.containsInAnyOrder("hello", "world", "goodbye"));
}
@Test
public void testBagClearBeforeRead() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.clear();
bag.add("hello");
assertThat(bag.get().read(), Matchers.containsInAnyOrder("hello"));
// Shouldn't need to read from windmill for this.
Mockito.verifyZeroInteractions(mockReader);
}
@Test
public void testBagIsEmptyFalse() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
SettableFuture<Iterable<String>> future = SettableFuture.create();
when(mockReader.listFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
StateContents<Boolean> result = bag.isEmpty();
Mockito.verify(mockReader)
.listFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of());
waitAndSet(future, Arrays.asList("world"), 200);
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testBagIsEmptyTrue() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
SettableFuture<Iterable<String>> future = SettableFuture.create();
when(mockReader.listFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
StateContents<Boolean> result = bag.isEmpty();
Mockito.verify(mockReader)
.listFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of());
waitAndSet(future, Arrays.<String>asList(), 200);
assertThat(result.read(), Matchers.is(true));
}
@Test
public void testBagIsEmptyAfterClear() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.clear();
StateContents<Boolean> result = bag.isEmpty();
Mockito.verify(mockReader, never())
.listFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of());
assertThat(result.read(), Matchers.is(true));
bag.add("hello");
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testBagAddPersist() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.add("hello");
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getListUpdatesCount());
TagList listUpdates = commitBuilder.getListUpdates(0);
assertEquals(key(NAMESPACE, "bag"), listUpdates.getTag());
assertEquals(1, listUpdates.getValuesCount());
assertEquals("hello", listUpdates.getValues(0).getData().substring(1).toStringUtf8());
// Blind adds should not need to read the future.
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testBagClearPersist() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.add("hello");
bag.clear();
bag.add("world");
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(2, commitBuilder.getListUpdatesCount());
TagList listClear = commitBuilder.getListUpdates(0);
assertEquals(key(NAMESPACE, "bag"), listClear.getTag());
assertEquals(Long.MAX_VALUE, listClear.getEndTimestamp());
assertEquals(0, listClear.getValuesCount());
TagList listUpdates = commitBuilder.getListUpdates(1);
assertEquals(key(NAMESPACE, "bag"), listUpdates.getTag());
assertEquals(1, listUpdates.getValuesCount());
assertEquals("world", listUpdates.getValues(0).getData().substring(1).toStringUtf8());
// Clear should need to read the future.
Mockito.verify(mockReader)
.listFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of());
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testBagPersistEmpty() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.clear();
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
// 1 list update = the clear
assertEquals(1, commitBuilder.getListUpdatesCount());
}
@Test
public void testBagNoStateFamilies() throws Exception {
underTest = new WindmillStateInternals(STATE_FAMILY, false, mockReader, readStateSupplier);
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
BagState<String> bag = underTest.state(NAMESPACE, addr);
bag.add("hello");
bag.clear();
bag.add("world");
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
// Clear should need to read the future.
Mockito.verify(mockReader)
.listFuture(key(STATE_FAMILY, NAMESPACE, "bag"), "", StringUtf8Coder.of());
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testCombiningAddBeforeRead() throws Exception {
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
SettableFuture<Iterable<int[]>> future = SettableFuture.create();
when(mockReader.listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder))
.thenReturn(future);
StateContents<Integer> result = value.get();
value.add(5);
value.add(6);
waitAndSet(future, Arrays.asList(new int[] {8}, new int[] {10}), 200);
assertThat(result.read(), Matchers.equalTo(29));
// That get "compressed" the combiner. So, the underlying future should change:
future.set(Arrays.asList(new int[] {29}));
value.add(2);
assertThat(result.read(), Matchers.equalTo(31));
}
@Test
public void testCombiningClearBeforeRead() throws Exception {
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.clear();
StateContents<Integer> result = value.get();
value.add(5);
value.add(6);
assertThat(result.read(), Matchers.equalTo(11));
value.add(2);
assertThat(result.read(), Matchers.equalTo(13));
// Shouldn't need to read from windmill for this because we immediately cleared..
Mockito.verifyZeroInteractions(mockReader);
}
@Test
public void testCombiningIsEmpty() throws Exception {
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
SettableFuture<Iterable<int[]>> future = SettableFuture.create();
when(mockReader.listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder))
.thenReturn(future);
StateContents<Boolean> result = value.isEmpty();
Mockito.verify(mockReader).listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder);
waitAndSet(future, Arrays.asList(new int[] {29}), 200);
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testCombiningIsEmptyAfterClear() throws Exception {
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.clear();
StateContents<Boolean> result = value.isEmpty();
Mockito.verify(mockReader, never())
.listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder);
assertThat(result.read(), Matchers.is(true));
value.add(87);
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testCombiningAddPersist() throws Exception {
disableCompactOnWrite();
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.add(5);
value.add(6);
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getListUpdatesCount());
TagList listUpdates = commitBuilder.getListUpdates(0);
assertEquals(COMBINING_KEY, listUpdates.getTag());
assertEquals(1, listUpdates.getValuesCount());
assertEquals(
11,
CoderUtils.decodeFromByteArray(
accumCoder, listUpdates.getValues(0).getData().substring(1).toByteArray())[0]);
// Blind adds should not need to read the future.
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testCombiningAddPersistWithCompact() throws Exception {
forceCompactOnWrite();
Mockito.stub(
mockReader.listFuture(
org.mockito.Matchers.<ByteString>any(),
org.mockito.Matchers.<String>any(),
org.mockito.Matchers.<Coder<int[]>>any()))
.toReturn(
Futures.<Iterable<int[]>>immediateFuture(
ImmutableList.of(new int[] {40}, new int[] {60})));
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.add(5);
value.add(6);
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(2, commitBuilder.getListUpdatesCount());
assertEquals(0, commitBuilder.getListUpdates(0).getValuesCount());
TagList listUpdates = commitBuilder.getListUpdates(1);
assertEquals(COMBINING_KEY, listUpdates.getTag());
assertEquals(1, listUpdates.getValuesCount());
assertEquals(
111,
CoderUtils.decodeFromByteArray(
accumCoder, listUpdates.getValues(0).getData().substring(1).toByteArray())[
0]);
}
@Test
public void testCombiningClearPersist() throws Exception {
disableCompactOnWrite();
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);
value.clear();
value.add(5);
value.add(6);
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(2, commitBuilder.getListUpdatesCount());
TagList listClear = commitBuilder.getListUpdates(0);
assertEquals(COMBINING_KEY, listClear.getTag());
assertEquals(Long.MAX_VALUE, listClear.getEndTimestamp());
assertEquals(0, listClear.getValuesCount());
TagList listUpdates = commitBuilder.getListUpdates(1);
assertEquals(COMBINING_KEY, listUpdates.getTag());
assertEquals(1, listUpdates.getValuesCount());
assertEquals(
11,
CoderUtils.decodeFromByteArray(
accumCoder, listUpdates.getValues(0).getData().substring(1).toByteArray())[0]);
// Blind adds should not need to read the future.
Mockito.verify(mockReader).listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder);
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkAddBeforeReadEarliest() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
StateContents<Instant> result = bag.get();
bag.add(new Instant(3000));
waitAndSet(future, new Instant(2000), 200);
assertThat(result.read(), Matchers.equalTo(new Instant(2000)));
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
// Adding another value doesn't create another future, but does update the result.
bag.add(new Instant(1000));
assertThat(result.read(), Matchers.equalTo(new Instant(1000)));
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkAddBeforeReadLatest() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtLatestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
StateContents<Instant> result = bag.get();
bag.add(new Instant(3000));
waitAndSet(future, new Instant(2000), 200);
assertThat(result.read(), Matchers.equalTo(new Instant(3000)));
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
// Adding another value doesn't create another future, but does update the result.
bag.add(new Instant(3000));
assertThat(result.read(), Matchers.equalTo(new Instant(3000)));
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkAddBeforeReadEndOfWindow() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEndOfWindow());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
StateContents<Instant> result = bag.get();
bag.add(new Instant(3000));
waitAndSet(future, new Instant(3000), 200);
assertThat(result.read(), Matchers.equalTo(new Instant(3000)));
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
// Adding another value doesn't create another future, but does update the result.
bag.add(new Instant(3000));
assertThat(result.read(), Matchers.equalTo(new Instant(3000)));
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkClearBeforeRead() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.clear();
assertThat(bag.get().read(), Matchers.nullValue());
bag.add(new Instant(300));
assertThat(bag.get().read(), Matchers.equalTo(new Instant(300)));
// Shouldn't need to read from windmill because the value is already available.
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkIsEmptyWindmillHasData() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
StateContents<Boolean> result = bag.isEmpty();
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
waitAndSet(future, new Instant(1000), 200);
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testWatermarkIsEmpty() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
SettableFuture<Instant> future = SettableFuture.create();
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future);
StateContents<Boolean> result = bag.isEmpty();
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
waitAndSet(future, null, 200);
assertThat(result.read(), Matchers.is(true));
}
@Test
public void testWatermarkIsEmptyAfterClear() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.clear();
StateContents<Boolean> result = bag.isEmpty();
Mockito.verify(mockReader, never()).watermarkFuture(key(NAMESPACE, addr.getId()), STATE_FAMILY);
assertThat(result.read(), Matchers.is(true));
bag.add(new Instant(1000));
assertThat(result.read(), Matchers.is(false));
}
@Test
public void testWatermarkPersistEarliest() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(1000));
bag.add(new Instant(2000));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0));
// Blind adds should not need to read the future.
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistLatestEmpty() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtLatestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(1000));
bag.add(new Instant(2000));
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY))
.thenReturn(Futures.<Instant>immediateFuture(null));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0));
// Blind adds should not need to read the future.
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistLatestWindmillWins() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtLatestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(1000));
bag.add(new Instant(2000));
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY))
.thenReturn(Futures.<Instant>immediateFuture(new Instant(4000)));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(4000), watermarkHold.getTimestamps(0));
// Blind adds should not need to read the future.
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistLatestLocalAdditionsWin() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtLatestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(1000));
bag.add(new Instant(2000));
when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY))
.thenReturn(Futures.<Instant>immediateFuture(new Instant(500)));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0));
// Blind adds should not need to read the future.
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistEndOfWindow() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEndOfWindow());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(2000));
bag.add(new Instant(2000));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0));
// Blind adds should not need to read the future.
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkClearPersist() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(500));
bag.clear();
bag.add(new Instant(1000));
bag.add(new Instant(2000));
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
Windmill.WatermarkHold clearAndUpdate = commitBuilder.getWatermarkHolds(0);
assertEquals(key(NAMESPACE, "watermark"), clearAndUpdate.getTag());
assertEquals(1, clearAndUpdate.getTimestampsCount());
assertEquals(key(NAMESPACE, "watermark"), clearAndUpdate.getTag());
assertEquals(1, clearAndUpdate.getTimestampsCount());
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), clearAndUpdate.getTimestamps(0));
// Clearing requires reading the future.
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testWatermarkPersistEmpty() throws Exception {
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.add(new Instant(500));
bag.clear();
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
// 1 list update corresponds to deletion. There shouldn't be a list update adding items.
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
}
@Test
public void testWatermarkNoStateFamiliesEarliest() throws Exception {
underTest = new WindmillStateInternals(STATE_FAMILY, false, mockReader, readStateSupplier);
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.get();
Mockito.verify(mockReader).watermarkFuture(key(STATE_FAMILY, NAMESPACE, "watermark"), "");
}
@Test
public void testWatermarkNoStateFamiliesLatest() throws Exception {
underTest = new WindmillStateInternals(STATE_FAMILY, false, mockReader, readStateSupplier);
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtLatestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.get();
Mockito.verify(mockReader).watermarkFuture(key(STATE_FAMILY, NAMESPACE, "watermark"), "");
}
@Test
public void testWatermarkNoStateFamiliesEndOfWindow() throws Exception {
underTest = new WindmillStateInternals(STATE_FAMILY, false, mockReader, readStateSupplier);
StateTag<WatermarkStateInternal> addr = StateTags.watermarkStateInternal(
"watermark", OutputTimeFns.outputAtLatestInputTimestamp());
WatermarkStateInternal bag = underTest.state(NAMESPACE, addr);
bag.get();
Mockito.verify(mockReader).watermarkFuture(key(STATE_FAMILY, NAMESPACE, "watermark"), "");
}
@Test
public void testValueSetBeforeRead() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
value.set("Hello");
assertEquals("Hello", value.get().read());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueClearBeforeRead() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
value.clear();
assertEquals(null, value.get().read());
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueRead() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
SettableFuture<String> future = SettableFuture.create();
when(mockReader.valueFuture(key(NAMESPACE, "value"), STATE_FAMILY, StringUtf8Coder.of()))
.thenReturn(future);
waitAndSet(future, "World", 200);
assertEquals("World", value.get().read());
}
@Test
public void testValueSetPersist() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
value.set("Hi");
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getValueUpdatesCount());
TagValue valueUpdate = commitBuilder.getValueUpdates(0);
assertEquals(key(NAMESPACE, "value"), valueUpdate.getTag());
assertEquals("Hi", valueUpdate.getValue().getData().toStringUtf8());
assertTrue(valueUpdate.isInitialized());
// Setting a value requires a read to prevent blind writes.
Mockito.verify(mockReader)
.valueFuture(key(NAMESPACE, "value"), STATE_FAMILY, StringUtf8Coder.of());
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueClearPersist() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
value.set("Hi");
value.clear();
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(1, commitBuilder.getValueUpdatesCount());
TagValue valueUpdate = commitBuilder.getValueUpdates(0);
assertEquals(key(NAMESPACE, "value"), valueUpdate.getTag());
assertEquals(0, valueUpdate.getValue().getData().size());
// Setting a value requires a read to prevent blind writes.
Mockito.verify(mockReader)
.valueFuture(key(NAMESPACE, "value"), STATE_FAMILY, StringUtf8Coder.of());
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueNoChangePersist() throws Exception {
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
underTest.state(NAMESPACE, addr);
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
assertEquals(0, commitBuilder.getValueUpdatesCount());
// No changes shouldn't require getting any futures
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
@Test
public void testValueNoStateFamilies() throws Exception {
underTest = new WindmillStateInternals(STATE_FAMILY, false, mockReader, readStateSupplier);
StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of());
ValueState<String> value = underTest.state(NAMESPACE, addr);
SettableFuture<String> future = SettableFuture.create();
when(mockReader.valueFuture(key(STATE_FAMILY, NAMESPACE, "value"), "", StringUtf8Coder.of()))
.thenReturn(future);
waitAndSet(future, "World", 200);
assertEquals("World", value.get().read());
}
private void disableCompactOnWrite() {
WindmillStateInternals.COMPACT_NOW.set(
new Supplier<Boolean>() {
public Boolean get() {
return false;
}
});
}
private void forceCompactOnWrite() {
WindmillStateInternals.COMPACT_NOW.set(
new Supplier<Boolean>() {
public Boolean get() {
return true;
}
});
}
}