| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker; |
| |
| import static org.apache.beam.runners.dataflow.worker.DataflowMatchers.ByteStringMatcher.byteStringEq; |
| import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.common.base.Charsets; |
| import com.google.common.collect.Iterables; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.AbstractMap; |
| import java.util.AbstractMap.SimpleEntry; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.core.StateNamespace; |
| import org.apache.beam.runners.core.StateNamespaceForTest; |
| import org.apache.beam.runners.core.StateTag; |
| import org.apache.beam.runners.core.StateTags; |
| import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; |
| import org.apache.beam.runners.dataflow.worker.WindmillStateInternals.IdTracker; |
| import org.apache.beam.runners.dataflow.worker.WindmillStateInternals.WindmillOrderedList; |
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill; |
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagBag; |
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagSortedListUpdateRequest; |
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagValue; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.Coder.Context; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.coders.VarIntCoder; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.state.BagState; |
| import org.apache.beam.sdk.state.CombiningState; |
| import org.apache.beam.sdk.state.GroupingState; |
| import org.apache.beam.sdk.state.MapState; |
| import org.apache.beam.sdk.state.OrderedListState; |
| import org.apache.beam.sdk.state.ReadableState; |
| import org.apache.beam.sdk.state.ValueState; |
| import org.apache.beam.sdk.state.WatermarkHoldState; |
| import org.apache.beam.sdk.transforms.Sum; |
| import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; |
| import org.apache.beam.sdk.util.CoderUtils; |
| import org.apache.beam.sdk.values.TimestampedValue; |
| import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Range; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.RangeSet; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture; |
| import org.hamcrest.Matchers; |
| import org.joda.time.Instant; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Mock; |
| import org.mockito.Mockito; |
| import org.mockito.MockitoAnnotations; |
| |
| /** Tests for {@link WindmillStateInternals}. */ |
| @RunWith(JUnit4.class) |
| @SuppressWarnings({ |
| "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) |
| }) |
| public class WindmillStateInternalsTest { |
| |
| private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns"); |
| private static final String STATE_FAMILY = "family"; |
| |
| private static final StateTag<CombiningState<Integer, int[], Integer>> COMBINING_ADDR = |
| StateTags.combiningValueFromInputInternal("combining", VarIntCoder.of(), Sum.ofIntegers()); |
| private static final ByteString COMBINING_KEY = key(NAMESPACE, "combining"); |
| private final Coder<int[]> accumCoder = |
| Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()); |
| private long workToken = 0; |
| |
| DataflowWorkerHarnessOptions options; |
| |
| @Mock private WindmillStateReader mockReader; |
| |
| private WindmillStateInternals<String> underTest; |
| private WindmillStateInternals<String> underTestNewKey; |
| private WindmillStateCache cache; |
| |
| @Mock private Supplier<Closeable> readStateSupplier; |
| |
| private static ByteString key(StateNamespace namespace, String addrId) { |
| return ByteString.copyFromUtf8(namespace.stringKey() + "+u" + addrId); |
| } |
| |
| private static ByteString systemKey(StateNamespace namespace, String addrId) { |
| return ByteString.copyFromUtf8(namespace.stringKey() + "+s" + addrId); |
| } |
| |
| @Before |
| public void setUp() { |
| MockitoAnnotations.initMocks(this); |
| options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); |
| cache = new WindmillStateCache(options.getWorkerCacheMb()); |
| resetUnderTest(); |
| } |
| |
| public void resetUnderTest() { |
| workToken++; |
| underTest = |
| new WindmillStateInternals<>( |
| "dummyKey", |
| STATE_FAMILY, |
| mockReader, |
| false, |
| cache |
| .forComputation("comp") |
| .forKey( |
| WindmillComputationKey.create( |
| "comp", ByteString.copyFrom("dummyKey", Charsets.UTF_8), 123), |
| 17L, |
| workToken) |
| .forFamily(STATE_FAMILY), |
| readStateSupplier); |
| underTestNewKey = |
| new WindmillStateInternals<String>( |
| "dummyNewKey", |
| STATE_FAMILY, |
| mockReader, |
| true, |
| cache |
| .forComputation("comp") |
| .forKey( |
| WindmillComputationKey.create( |
| "comp", ByteString.copyFrom("dummyNewKey", Charsets.UTF_8), 123), |
| 17L, |
| workToken) |
| .forFamily(STATE_FAMILY), |
| readStateSupplier); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| // Make sure no WindmillStateReader (a per-WorkItem object) escapes into the cache |
| // (a global object). |
| WindmillStateTestUtils.assertNoReference(cache, WindmillStateReader.class); |
| } |
| |
| private <T> void waitAndSet(final SettableFuture<T> future, final T value, final long millis) { |
| new Thread( |
| () -> { |
| try { |
| sleepMillis(millis); |
| } catch (InterruptedException e) { |
| throw new RuntimeException("Interrupted before setting", e); |
| } |
| future.set(value); |
| }) |
| .run(); |
| } |
| |
| private WindmillStateReader.WeightedList<String> weightedList(String... elems) { |
| WindmillStateReader.WeightedList<String> result = |
| new WindmillStateReader.WeightedList<>(new ArrayList<String>(elems.length)); |
| for (String elem : elems) { |
| result.addWeighted(elem, elem.length()); |
| } |
| return result; |
| } |
| |
| private <K> ByteString protoKeyFromUserKey(@Nullable K tag, Coder<K> keyCoder) |
| throws IOException { |
| ByteString.Output keyStream = ByteString.newOutput(); |
| key(NAMESPACE, "map").writeTo(keyStream); |
| if (tag != null) { |
| keyCoder.encode(tag, keyStream, Context.OUTER); |
| } |
| return keyStream.toByteString(); |
| } |
| |
| private <K> K userKeyFromProtoKey(ByteString tag, Coder<K> keyCoder) throws IOException { |
| ByteString keyBytes = tag.substring(key(NAMESPACE, "map").size()); |
| return keyCoder.decode(keyBytes.newInput(), Context.OUTER); |
| } |
| |
| @Test |
| public void testMapAddBeforeGet() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag = "tag"; |
| SettableFuture<Integer> future = SettableFuture.create(); |
| when(mockReader.valueFuture( |
| protoKeyFromUserKey(tag, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(future); |
| |
| ReadableState<Integer> result = mapState.get("tag"); |
| result = result.readLater(); |
| waitAndSet(future, 1, 200); |
| assertEquals(1, (int) result.read()); |
| mapState.put("tag", 2); |
| assertEquals(2, (int) result.read()); |
| } |
| |
| @Test |
| public void testMapAddClearBeforeGet() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag = "tag"; |
| |
| SettableFuture<Iterable<Map.Entry<ByteString, Integer>>> prefixFuture = SettableFuture.create(); |
| when(mockReader.valuePrefixFuture( |
| protoKeyFromUserKey(null, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(prefixFuture); |
| |
| ReadableState<Integer> result = mapState.get("tag"); |
| result = result.readLater(); |
| waitAndSet( |
| prefixFuture, |
| ImmutableList.of( |
| new AbstractMap.SimpleEntry<>(protoKeyFromUserKey(tag, StringUtf8Coder.of()), 1)), |
| 50); |
| assertFalse(mapState.isEmpty().read()); |
| mapState.clear(); |
| assertTrue(mapState.isEmpty().read()); |
| assertNull(mapState.get("tag").read()); |
| mapState.put("tag", 2); |
| assertFalse(mapState.isEmpty().read()); |
| assertEquals(2, (int) result.read()); |
| } |
| |
| @Test |
| public void testMapLocalAddOverridesStorage() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag = "tag"; |
| |
| SettableFuture<Integer> future = SettableFuture.create(); |
| when(mockReader.valueFuture( |
| protoKeyFromUserKey(tag, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(future); |
| SettableFuture<Iterable<Map.Entry<ByteString, Integer>>> prefixFuture = SettableFuture.create(); |
| when(mockReader.valuePrefixFuture( |
| protoKeyFromUserKey(null, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(prefixFuture); |
| |
| waitAndSet(future, 1, 50); |
| waitAndSet( |
| prefixFuture, |
| ImmutableList.of( |
| new AbstractMap.SimpleEntry<>(protoKeyFromUserKey(tag, StringUtf8Coder.of()), 1)), |
| 50); |
| mapState.put(tag, 42); |
| assertEquals(42, (int) mapState.get(tag).read()); |
| assertThat( |
| mapState.entries().read(), |
| Matchers.containsInAnyOrder(new AbstractMap.SimpleEntry<>(tag, 42))); |
| } |
| |
| @Test |
| public void testMapLocalRemoveOverridesStorage() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| final String tag2 = "tag2"; |
| |
| SettableFuture<Integer> future = SettableFuture.create(); |
| when(mockReader.valueFuture( |
| protoKeyFromUserKey(tag1, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(future); |
| SettableFuture<Iterable<Map.Entry<ByteString, Integer>>> prefixFuture = SettableFuture.create(); |
| when(mockReader.valuePrefixFuture( |
| protoKeyFromUserKey(null, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(prefixFuture); |
| |
| waitAndSet(future, 1, 50); |
| waitAndSet( |
| prefixFuture, |
| ImmutableList.of( |
| new AbstractMap.SimpleEntry<>(protoKeyFromUserKey(tag1, StringUtf8Coder.of()), 1), |
| new AbstractMap.SimpleEntry<>(protoKeyFromUserKey(tag2, StringUtf8Coder.of()), 2)), |
| 50); |
| mapState.remove(tag1); |
| assertNull(mapState.get(tag1).read()); |
| assertThat( |
| mapState.entries().read(), |
| Matchers.containsInAnyOrder(new AbstractMap.SimpleEntry<>(tag2, 2))); |
| |
| mapState.remove(tag2); |
| assertTrue(mapState.isEmpty().read()); |
| } |
| |
| @Test |
| public void testMapLocalClearOverridesStorage() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| final String tag2 = "tag2"; |
| |
| SettableFuture<Integer> future1 = SettableFuture.create(); |
| SettableFuture<Integer> future2 = SettableFuture.create(); |
| |
| when(mockReader.valueFuture( |
| protoKeyFromUserKey(tag1, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(future1); |
| when(mockReader.valueFuture( |
| protoKeyFromUserKey(tag2, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(future2); |
| SettableFuture<Iterable<Map.Entry<ByteString, Integer>>> prefixFuture = SettableFuture.create(); |
| when(mockReader.valuePrefixFuture( |
| protoKeyFromUserKey(null, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(prefixFuture); |
| |
| waitAndSet(future1, 1, 50); |
| waitAndSet(future2, 2, 50); |
| waitAndSet( |
| prefixFuture, |
| ImmutableList.of( |
| new AbstractMap.SimpleEntry<>(protoKeyFromUserKey(tag1, StringUtf8Coder.of()), 1), |
| new AbstractMap.SimpleEntry<>(protoKeyFromUserKey(tag2, StringUtf8Coder.of()), 2)), |
| 50); |
| mapState.clear(); |
| assertNull(mapState.get(tag1).read()); |
| assertNull(mapState.get(tag2).read()); |
| assertThat(mapState.entries().read(), Matchers.emptyIterable()); |
| assertTrue(mapState.isEmpty().read()); |
| } |
| |
| @Test |
| public void testMapAddBeforeRead() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| final String tag2 = "tag2"; |
| final String tag3 = "tag3"; |
| SettableFuture<Iterable<Map.Entry<ByteString, Integer>>> prefixFuture = SettableFuture.create(); |
| when(mockReader.valuePrefixFuture( |
| protoKeyFromUserKey(null, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(prefixFuture); |
| |
| ReadableState<Iterable<Map.Entry<String, Integer>>> result = mapState.entries(); |
| result = result.readLater(); |
| |
| mapState.put(tag1, 1); |
| waitAndSet( |
| prefixFuture, |
| ImmutableList.of( |
| new AbstractMap.SimpleEntry<>(protoKeyFromUserKey(tag2, StringUtf8Coder.of()), 2)), |
| 200); |
| Iterable<Map.Entry<String, Integer>> readData = result.read(); |
| assertThat( |
| readData, |
| Matchers.containsInAnyOrder( |
| new AbstractMap.SimpleEntry<>(tag1, 1), new AbstractMap.SimpleEntry<>(tag2, 2))); |
| |
| mapState.put(tag3, 3); |
| assertThat( |
| result.read(), |
| Matchers.containsInAnyOrder( |
| new AbstractMap.SimpleEntry<>(tag1, 1), |
| new AbstractMap.SimpleEntry<>(tag2, 2), |
| new AbstractMap.SimpleEntry<>(tag3, 3))); |
| } |
| |
| @Test |
| public void testMapPutIfAbsentSucceeds() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| SettableFuture<Integer> future = SettableFuture.create(); |
| when(mockReader.valueFuture( |
| protoKeyFromUserKey(tag1, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(future); |
| waitAndSet(future, null, 50); |
| |
| assertNull(mapState.putIfAbsent(tag1, 42).read()); |
| assertEquals(42, (int) mapState.get(tag1).read()); |
| } |
| |
| @Test |
| public void testMapPutIfAbsentFails() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| mapState.put(tag1, 1); |
| assertEquals(1, (int) mapState.putIfAbsent(tag1, 42).read()); |
| assertEquals(1, (int) mapState.get(tag1).read()); |
| |
| final String tag2 = "tag2"; |
| SettableFuture<Integer> future = SettableFuture.create(); |
| when(mockReader.valueFuture( |
| protoKeyFromUserKey(tag2, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(future); |
| waitAndSet(future, 2, 50); |
| assertEquals(2, (int) mapState.putIfAbsent(tag2, 42).read()); |
| assertEquals(2, (int) mapState.get(tag2).read()); |
| } |
| |
| @Test |
| public void testMapNegativeCache() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag = "tag"; |
| SettableFuture<Integer> future = SettableFuture.create(); |
| when(mockReader.valueFuture( |
| protoKeyFromUserKey(tag, StringUtf8Coder.of()), STATE_FAMILY, VarIntCoder.of())) |
| .thenReturn(future); |
| waitAndSet(future, null, 200); |
| assertNull(mapState.get(tag).read()); |
| future.set(42); |
| assertNull(mapState.get(tag).read()); |
| } |
| |
| private <K, V> Map.Entry<K, V> fromTagValue( |
| TagValue tagValue, Coder<K> keyCoder, Coder<V> valueCoder) { |
| try { |
| V value = |
| !tagValue.getValue().getData().isEmpty() |
| ? valueCoder.decode(tagValue.getValue().getData().newInput()) |
| : null; |
| return new AbstractMap.SimpleEntry<>(userKeyFromProtoKey(tagValue.getTag(), keyCoder), value); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Test |
| public void testMapAddPersist() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| final String tag2 = "tag2"; |
| mapState.put(tag1, 1); |
| mapState.put(tag2, 2); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(2, commitBuilder.getValueUpdatesCount()); |
| assertThat( |
| commitBuilder.getValueUpdatesList().stream() |
| .map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of())) |
| .collect(Collectors.toList()), |
| Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, 1), new SimpleEntry<>(tag2, 2))); |
| } |
| |
| @Test |
| public void testMapRemovePersist() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| final String tag2 = "tag2"; |
| mapState.remove(tag1); |
| mapState.remove(tag2); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(2, commitBuilder.getValueUpdatesCount()); |
| assertThat( |
| commitBuilder.getValueUpdatesList().stream() |
| .map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of())) |
| .collect(Collectors.toList()), |
| Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, null), new SimpleEntry<>(tag2, null))); |
| } |
| |
| @Test |
| public void testMapClearPersist() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| final String tag2 = "tag2"; |
| mapState.put(tag1, 1); |
| mapState.put(tag2, 2); |
| mapState.clear(); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(0, commitBuilder.getValueUpdatesCount()); |
| assertEquals(1, commitBuilder.getTagValuePrefixDeletesCount()); |
| System.err.println(commitBuilder); |
| assertEquals(STATE_FAMILY, commitBuilder.getTagValuePrefixDeletes(0).getStateFamily()); |
| assertEquals( |
| protoKeyFromUserKey(null, StringUtf8Coder.of()), |
| commitBuilder.getTagValuePrefixDeletes(0).getTagPrefix()); |
| } |
| |
| @Test |
| public void testMapComplexPersist() throws Exception { |
| StateTag<MapState<String, Integer>> addr = |
| StateTags.map("map", StringUtf8Coder.of(), VarIntCoder.of()); |
| MapState<String, Integer> mapState = underTest.state(NAMESPACE, addr); |
| |
| final String tag1 = "tag1"; |
| final String tag2 = "tag2"; |
| final String tag3 = "tag3"; |
| final String tag4 = "tag4"; |
| |
| mapState.put(tag1, 1); |
| mapState.clear(); |
| mapState.put(tag2, 2); |
| mapState.put(tag3, 3); |
| mapState.remove(tag2); |
| mapState.remove(tag4); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| assertEquals(1, commitBuilder.getTagValuePrefixDeletesCount()); |
| assertEquals(STATE_FAMILY, commitBuilder.getTagValuePrefixDeletes(0).getStateFamily()); |
| assertEquals( |
| protoKeyFromUserKey(null, StringUtf8Coder.of()), |
| commitBuilder.getTagValuePrefixDeletes(0).getTagPrefix()); |
| assertThat( |
| commitBuilder.getValueUpdatesList().stream() |
| .map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of())) |
| .collect(Collectors.toList()), |
| Matchers.containsInAnyOrder( |
| new SimpleEntry<>(tag3, 3), |
| new SimpleEntry<>(tag2, null), |
| new SimpleEntry<>(tag4, null))); |
| |
| // Once persist has been called, calling persist again should be a noop. |
| commitBuilder = Windmill.WorkItemCommitRequest.newBuilder(); |
| assertEquals(0, commitBuilder.getTagValuePrefixDeletesCount()); |
| assertEquals(0, commitBuilder.getValueUpdatesCount()); |
| } |
| |
| public static final Range<Long> FULL_ORDERED_LIST_RANGE = |
| Range.closedOpen(WindmillOrderedList.MIN_TS_MICROS, WindmillOrderedList.MAX_TS_MICROS); |
| |
| @Test |
| public void testOrderedListAddBeforeRead() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Iterable<TimestampedValue<String>>> future = SettableFuture.create(); |
| when(mockReader.orderedListFuture( |
| FULL_ORDERED_LIST_RANGE, |
| key(NAMESPACE, "orderedList"), |
| STATE_FAMILY, |
| StringUtf8Coder.of())) |
| .thenReturn(future); |
| |
| orderedList.readLater(); |
| |
| final TimestampedValue<String> helloValue = |
| TimestampedValue.of("hello", Instant.ofEpochMilli(100)); |
| final TimestampedValue<String> worldValue = |
| TimestampedValue.of("world", Instant.ofEpochMilli(75)); |
| final TimestampedValue<String> goodbyeValue = |
| TimestampedValue.of("goodbye", Instant.ofEpochMilli(50)); |
| |
| orderedList.add(helloValue); |
| waitAndSet(future, Arrays.asList(worldValue), 200); |
| assertThat(orderedList.read(), Matchers.contains(worldValue, helloValue)); |
| |
| orderedList.add(goodbyeValue); |
| assertThat(orderedList.read(), Matchers.contains(goodbyeValue, worldValue, helloValue)); |
| } |
| |
| @Test |
| public void testOrderedListClearBeforeRead() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr); |
| |
| final TimestampedValue<String> helloElement = TimestampedValue.of("hello", Instant.EPOCH); |
| orderedListState.clear(); |
| orderedListState.add(helloElement); |
| assertThat(orderedListState.read(), Matchers.containsInAnyOrder(helloElement)); |
| |
| // Shouldn't need to read from windmill for this. |
| Mockito.verifyZeroInteractions(mockReader); |
| } |
| |
| @Test |
| public void testOrderedListIsEmptyFalse() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Iterable<TimestampedValue<String>>> future = SettableFuture.create(); |
| when(mockReader.orderedListFuture( |
| FULL_ORDERED_LIST_RANGE, |
| key(NAMESPACE, "orderedList"), |
| STATE_FAMILY, |
| StringUtf8Coder.of())) |
| .thenReturn(future); |
| ReadableState<Boolean> result = orderedList.isEmpty().readLater(); |
| Mockito.verify(mockReader) |
| .orderedListFuture( |
| FULL_ORDERED_LIST_RANGE, |
| key(NAMESPACE, "orderedList"), |
| STATE_FAMILY, |
| StringUtf8Coder.of()); |
| |
| waitAndSet(future, Arrays.asList(TimestampedValue.of("world", Instant.EPOCH)), 200); |
| assertThat(result.read(), Matchers.is(false)); |
| } |
| |
| @Test |
| public void testOrderedListIsEmptyTrue() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Iterable<TimestampedValue<String>>> future = SettableFuture.create(); |
| when(mockReader.orderedListFuture( |
| FULL_ORDERED_LIST_RANGE, |
| key(NAMESPACE, "orderedList"), |
| STATE_FAMILY, |
| StringUtf8Coder.of())) |
| .thenReturn(future); |
| ReadableState<Boolean> result = orderedList.isEmpty().readLater(); |
| Mockito.verify(mockReader) |
| .orderedListFuture( |
| FULL_ORDERED_LIST_RANGE, |
| key(NAMESPACE, "orderedList"), |
| STATE_FAMILY, |
| StringUtf8Coder.of()); |
| |
| waitAndSet(future, Collections.emptyList(), 200); |
| assertThat(result.read(), Matchers.is(true)); |
| } |
| |
| @Test |
| public void testOrderedListIsEmptyAfterClear() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr); |
| |
| orderedList.clear(); |
| ReadableState<Boolean> result = orderedList.isEmpty(); |
| Mockito.verify(mockReader, never()) |
| .orderedListFuture( |
| FULL_ORDERED_LIST_RANGE, |
| key(NAMESPACE, "orderedList"), |
| STATE_FAMILY, |
| StringUtf8Coder.of()); |
| assertThat(result.read(), Matchers.is(true)); |
| |
| orderedList.add(TimestampedValue.of("hello", Instant.EPOCH)); |
| assertThat(result.read(), Matchers.is(false)); |
| } |
| |
| @Test |
| public void testOrderedListAddPersist() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture = SettableFuture.create(); |
| orderedListFuture.set(null); |
| SettableFuture<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture = |
| SettableFuture.create(); |
| deletionsFuture.set(null); |
| when(mockReader.valueFuture( |
| systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR), |
| STATE_FAMILY, |
| IdTracker.IDS_AVAILABLE_CODER)) |
| .thenReturn(orderedListFuture); |
| when(mockReader.valueFuture( |
| systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR), |
| STATE_FAMILY, |
| IdTracker.SUBRANGE_DELETIONS_CODER)) |
| .thenReturn(deletionsFuture); |
| |
| orderedList.add(TimestampedValue.of("hello", Instant.ofEpochMilli(1))); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getSortedListUpdatesCount()); |
| TagSortedListUpdateRequest updates = commitBuilder.getSortedListUpdates(0); |
| assertEquals(key(NAMESPACE, "orderedList"), updates.getTag()); |
| assertEquals(1, updates.getInsertsCount()); |
| assertEquals(1, updates.getInserts(0).getEntriesCount()); |
| |
| assertEquals("hello", updates.getInserts(0).getEntries(0).getValue().toStringUtf8()); |
| assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey()); |
| assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId()); |
| } |
| |
| @Test |
| public void testOrderedListClearPersist() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr); |
| |
| orderedListState.add(TimestampedValue.of("hello", Instant.ofEpochMilli(1))); |
| orderedListState.clear(); |
| orderedListState.add(TimestampedValue.of("world", Instant.ofEpochMilli(2))); |
| orderedListState.add(TimestampedValue.of("world", Instant.ofEpochMilli(2))); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getSortedListUpdatesCount()); |
| TagSortedListUpdateRequest updates = commitBuilder.getSortedListUpdates(0); |
| assertEquals(STATE_FAMILY, updates.getStateFamily()); |
| assertEquals(key(NAMESPACE, "orderedList"), updates.getTag()); |
| assertEquals(1, updates.getInsertsCount()); |
| assertEquals(2, updates.getInserts(0).getEntriesCount()); |
| |
| assertEquals("world", updates.getInserts(0).getEntries(0).getValue().toStringUtf8()); |
| assertEquals("world", updates.getInserts(0).getEntries(1).getValue().toStringUtf8()); |
| assertEquals(2000, updates.getInserts(0).getEntries(0).getSortKey()); |
| assertEquals(2000, updates.getInserts(0).getEntries(1).getSortKey()); |
| assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId()); |
| assertEquals(IdTracker.MIN_ID + 1, updates.getInserts(0).getEntries(1).getId()); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testOrderedListDeleteRangePersist() { |
| SettableFuture<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture = SettableFuture.create(); |
| orderedListFuture.set(null); |
| SettableFuture<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture = |
| SettableFuture.create(); |
| deletionsFuture.set(null); |
| when(mockReader.valueFuture( |
| systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR), |
| STATE_FAMILY, |
| IdTracker.IDS_AVAILABLE_CODER)) |
| .thenReturn(orderedListFuture); |
| when(mockReader.valueFuture( |
| systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR), |
| STATE_FAMILY, |
| IdTracker.SUBRANGE_DELETIONS_CODER)) |
| .thenReturn(deletionsFuture); |
| |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr); |
| |
| orderedListState.add(TimestampedValue.of("hello", Instant.ofEpochMilli(1))); |
| orderedListState.add(TimestampedValue.of("hello", Instant.ofEpochMilli(2))); |
| orderedListState.add(TimestampedValue.of("hello", Instant.ofEpochMilli(2))); |
| orderedListState.add(TimestampedValue.of("world", Instant.ofEpochMilli(3))); |
| orderedListState.add(TimestampedValue.of("world", Instant.ofEpochMilli(4))); |
| orderedListState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4)); |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getSortedListUpdatesCount()); |
| TagSortedListUpdateRequest updates = commitBuilder.getSortedListUpdates(0); |
| assertEquals(STATE_FAMILY, updates.getStateFamily()); |
| assertEquals(key(NAMESPACE, "orderedList"), updates.getTag()); |
| assertEquals(1, updates.getInsertsCount()); |
| assertEquals(2, updates.getInserts(0).getEntriesCount()); |
| |
| assertEquals("hello", updates.getInserts(0).getEntries(0).getValue().toStringUtf8()); |
| assertEquals("world", updates.getInserts(0).getEntries(1).getValue().toStringUtf8()); |
| assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey()); |
| assertEquals(4000, updates.getInserts(0).getEntries(1).getSortKey()); |
| assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId()); |
| assertEquals(IdTracker.MIN_ID + 1, updates.getInserts(0).getEntries(1).getId()); |
| } |
| |
| @Test |
| public void testOrderedListMergePendingAdds() { |
| SettableFuture<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture = SettableFuture.create(); |
| orderedListFuture.set(null); |
| SettableFuture<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture = |
| SettableFuture.create(); |
| deletionsFuture.set(null); |
| when(mockReader.valueFuture( |
| systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR), |
| STATE_FAMILY, |
| IdTracker.IDS_AVAILABLE_CODER)) |
| .thenReturn(orderedListFuture); |
| when(mockReader.valueFuture( |
| systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR), |
| STATE_FAMILY, |
| IdTracker.SUBRANGE_DELETIONS_CODER)) |
| .thenReturn(deletionsFuture); |
| |
| SettableFuture<Iterable<TimestampedValue<String>>> fromStorage = SettableFuture.create(); |
| when(mockReader.orderedListFuture( |
| FULL_ORDERED_LIST_RANGE, |
| key(NAMESPACE, "orderedList"), |
| STATE_FAMILY, |
| StringUtf8Coder.of())) |
| .thenReturn(fromStorage); |
| |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr); |
| |
| orderedListState.add(TimestampedValue.of("second", Instant.ofEpochMilli(1))); |
| orderedListState.add(TimestampedValue.of("third", Instant.ofEpochMilli(2))); |
| orderedListState.add(TimestampedValue.of("fourth", Instant.ofEpochMilli(2))); |
| orderedListState.add(TimestampedValue.of("eighth", Instant.ofEpochMilli(10))); |
| orderedListState.add(TimestampedValue.of("ninth", Instant.ofEpochMilli(15))); |
| |
| fromStorage.set( |
| ImmutableList.of( |
| TimestampedValue.of("first", Instant.ofEpochMilli(-1)), |
| TimestampedValue.of("fifth", Instant.ofEpochMilli(5)), |
| TimestampedValue.of("sixth", Instant.ofEpochMilli(5)), |
| TimestampedValue.of("seventh", Instant.ofEpochMilli(5)), |
| TimestampedValue.of("tenth", Instant.ofEpochMilli(20)))); |
| |
| TimestampedValue[] expected = |
| Iterables.toArray( |
| ImmutableList.of( |
| TimestampedValue.of("first", Instant.ofEpochMilli(-1)), |
| TimestampedValue.of("second", Instant.ofEpochMilli(1)), |
| TimestampedValue.of("third", Instant.ofEpochMilli(2)), |
| TimestampedValue.of("fourth", Instant.ofEpochMilli(2)), |
| TimestampedValue.of("fifth", Instant.ofEpochMilli(5)), |
| TimestampedValue.of("sixth", Instant.ofEpochMilli(5)), |
| TimestampedValue.of("seventh", Instant.ofEpochMilli(5)), |
| TimestampedValue.of("eighth", Instant.ofEpochMilli(10)), |
| TimestampedValue.of("ninth", Instant.ofEpochMilli(15)), |
| TimestampedValue.of("tenth", Instant.ofEpochMilli(20))), |
| TimestampedValue.class); |
| |
| TimestampedValue[] read = Iterables.toArray(orderedListState.read(), TimestampedValue.class); |
| assertArrayEquals(expected, read); |
| } |
| |
| @Test |
| public void testOrderedListPersistEmpty() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr); |
| |
| orderedListState.clear(); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| // 1 bag update = the clear |
| assertEquals(1, commitBuilder.getSortedListUpdatesCount()); |
| TagSortedListUpdateRequest updates = commitBuilder.getSortedListUpdates(0); |
| assertEquals(1, updates.getDeletesCount()); |
| assertEquals(WindmillOrderedList.MIN_TS_MICROS, updates.getDeletes(0).getRange().getStart()); |
| assertEquals(WindmillOrderedList.MAX_TS_MICROS, updates.getDeletes(0).getRange().getLimit()); |
| } |
| |
| @Test |
| public void testNewOrderedListNoFetch() throws Exception { |
| StateTag<OrderedListState<String>> addr = |
| StateTags.orderedList("orderedList", StringUtf8Coder.of()); |
| OrderedListState<String> orderedList = underTestNewKey.state(NAMESPACE, addr); |
| |
| assertThat(orderedList.read(), Matchers.emptyIterable()); |
| |
| // Shouldn't need to read from windmill for this. |
| Mockito.verifyZeroInteractions(mockReader); |
| } |
| |
| // test ordered list cleared before read |
| // test fetch + add + read |
| // test ids |
| |
| @Test |
| public void testBagAddBeforeRead() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Iterable<String>> future = SettableFuture.create(); |
| when(mockReader.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of())) |
| .thenReturn(future); |
| |
| bag.readLater(); |
| |
| bag.add("hello"); |
| waitAndSet(future, Arrays.asList("world"), 200); |
| assertThat(bag.read(), Matchers.containsInAnyOrder("hello", "world")); |
| |
| bag.add("goodbye"); |
| assertThat(bag.read(), Matchers.containsInAnyOrder("hello", "world", "goodbye")); |
| } |
| |
| @Test |
| public void testBagClearBeforeRead() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| bag.clear(); |
| bag.add("hello"); |
| assertThat(bag.read(), Matchers.containsInAnyOrder("hello")); |
| |
| // Shouldn't need to read from windmill for this. |
| Mockito.verifyZeroInteractions(mockReader); |
| } |
| |
| @Test |
| public void testBagIsEmptyFalse() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Iterable<String>> future = SettableFuture.create(); |
| when(mockReader.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of())) |
| .thenReturn(future); |
| ReadableState<Boolean> result = bag.isEmpty().readLater(); |
| Mockito.verify(mockReader).bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()); |
| |
| waitAndSet(future, Arrays.asList("world"), 200); |
| assertThat(result.read(), Matchers.is(false)); |
| } |
| |
| @Test |
| public void testBagIsEmptyTrue() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Iterable<String>> future = SettableFuture.create(); |
| when(mockReader.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of())) |
| .thenReturn(future); |
| ReadableState<Boolean> result = bag.isEmpty().readLater(); |
| Mockito.verify(mockReader).bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()); |
| |
| waitAndSet(future, Arrays.<String>asList(), 200); |
| assertThat(result.read(), Matchers.is(true)); |
| } |
| |
| @Test |
| public void testBagIsEmptyAfterClear() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| bag.clear(); |
| ReadableState<Boolean> result = bag.isEmpty(); |
| Mockito.verify(mockReader, never()) |
| .bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()); |
| assertThat(result.read(), Matchers.is(true)); |
| |
| bag.add("hello"); |
| assertThat(result.read(), Matchers.is(false)); |
| } |
| |
| @Test |
| public void testBagAddPersist() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| bag.add("hello"); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getBagUpdatesCount()); |
| |
| TagBag bagUpdates = commitBuilder.getBagUpdates(0); |
| assertEquals(key(NAMESPACE, "bag"), bagUpdates.getTag()); |
| assertEquals(1, bagUpdates.getValuesCount()); |
| assertEquals("hello", bagUpdates.getValues(0).toStringUtf8()); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testBagClearPersist() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| bag.add("hello"); |
| bag.clear(); |
| bag.add("world"); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getBagUpdatesCount()); |
| |
| TagBag tagBag = commitBuilder.getBagUpdates(0); |
| assertEquals(key(NAMESPACE, "bag"), tagBag.getTag()); |
| assertEquals(STATE_FAMILY, tagBag.getStateFamily()); |
| assertTrue(tagBag.getDeleteAll()); |
| assertEquals(1, tagBag.getValuesCount()); |
| assertEquals("world", tagBag.getValues(0).toStringUtf8()); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testBagPersistEmpty() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| bag.clear(); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| // 1 bag update = the clear |
| assertEquals(1, commitBuilder.getBagUpdatesCount()); |
| } |
| |
| @Test |
| public void testNewBagNoFetch() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTestNewKey.state(NAMESPACE, addr); |
| |
| assertThat(bag.read(), Matchers.emptyIterable()); |
| |
| // Shouldn't need to read from windmill for this. |
| Mockito.verifyZeroInteractions(mockReader); |
| } |
| |
| @Test |
| @SuppressWarnings("ArraysAsListPrimitiveArray") |
| public void testCombiningAddBeforeRead() throws Exception { |
| GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| |
| SettableFuture<Iterable<int[]>> future = SettableFuture.create(); |
| when(mockReader.bagFuture(eq(COMBINING_KEY), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any())) |
| .thenReturn(future); |
| |
| value.readLater(); |
| |
| value.add(5); |
| value.add(6); |
| waitAndSet(future, Arrays.asList(new int[] {8}, new int[] {10}), 200); |
| assertThat(value.read(), Matchers.equalTo(29)); |
| |
| // That get "compressed" the combiner. So, the underlying future should change: |
| future.set(Arrays.asList(new int[] {29})); |
| |
| value.add(2); |
| assertThat(value.read(), Matchers.equalTo(31)); |
| } |
| |
| @Test |
| public void testCombiningClearBeforeRead() throws Exception { |
| GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| |
| value.clear(); |
| value.readLater(); |
| |
| value.add(5); |
| value.add(6); |
| assertThat(value.read(), Matchers.equalTo(11)); |
| |
| value.add(2); |
| assertThat(value.read(), Matchers.equalTo(13)); |
| |
| // Shouldn't need to read from windmill for this because we immediately cleared.. |
| Mockito.verifyZeroInteractions(mockReader); |
| } |
| |
| @Test |
| @SuppressWarnings("ArraysAsListPrimitiveArray") |
| public void testCombiningIsEmpty() throws Exception { |
| GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| |
| SettableFuture<Iterable<int[]>> future = SettableFuture.create(); |
| when(mockReader.bagFuture(eq(COMBINING_KEY), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any())) |
| .thenReturn(future); |
| ReadableState<Boolean> result = value.isEmpty().readLater(); |
| ArgumentCaptor<ByteString> byteString = ArgumentCaptor.forClass(ByteString.class); |
| |
| // Note that we do expect the third argument - the coder - to be equal to accumCoder, but that |
| // is possibly overspecified and currently trips an issue in the SDK where identical coders are |
| // not #equals(). |
| // |
| // What matters is that a future is created, hence a Windmill RPC sent. |
| Mockito.verify(mockReader) |
| .bagFuture(byteString.capture(), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any()); |
| assertThat(byteString.getValue(), byteStringEq(COMBINING_KEY)); |
| |
| waitAndSet(future, Arrays.asList(new int[] {29}), 200); |
| assertThat(result.read(), Matchers.is(false)); |
| } |
| |
| @Test |
| public void testCombiningIsEmptyAfterClear() throws Exception { |
| GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| |
| value.clear(); |
| ReadableState<Boolean> result = value.isEmpty(); |
| Mockito.verify(mockReader, never()).bagFuture(COMBINING_KEY, STATE_FAMILY, accumCoder); |
| assertThat(result.read(), Matchers.is(true)); |
| |
| value.add(87); |
| assertThat(result.read(), Matchers.is(false)); |
| } |
| |
| @Test |
| public void testCombiningAddPersist() throws Exception { |
| disableCompactOnWrite(); |
| |
| GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| |
| value.add(5); |
| value.add(6); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getBagUpdatesCount()); |
| |
| TagBag bagUpdates = commitBuilder.getBagUpdates(0); |
| assertEquals(COMBINING_KEY, bagUpdates.getTag()); |
| assertEquals(1, bagUpdates.getValuesCount()); |
| assertEquals( |
| 11, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testCombiningAddPersistWithCompact() throws Exception { |
| forceCompactOnWrite(); |
| |
| Mockito.when( |
| mockReader.bagFuture( |
| org.mockito.Matchers.<ByteString>any(), |
| org.mockito.Matchers.<String>any(), |
| org.mockito.Matchers.<Coder<int[]>>any())) |
| .thenReturn( |
| Futures.<Iterable<int[]>>immediateFuture( |
| ImmutableList.of(new int[] {40}, new int[] {60}))); |
| |
| GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| |
| value.add(5); |
| value.add(6); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getBagUpdatesCount()); |
| TagBag bagUpdates = commitBuilder.getBagUpdates(0); |
| assertEquals(COMBINING_KEY, bagUpdates.getTag()); |
| assertEquals(1, bagUpdates.getValuesCount()); |
| assertTrue(bagUpdates.getDeleteAll()); |
| assertEquals( |
| 111, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]); |
| } |
| |
| @Test |
| public void testCombiningClearPersist() throws Exception { |
| disableCompactOnWrite(); |
| |
| GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| |
| value.clear(); |
| value.add(5); |
| value.add(6); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getBagUpdatesCount()); |
| |
| TagBag tagBag = commitBuilder.getBagUpdates(0); |
| assertEquals(COMBINING_KEY, tagBag.getTag()); |
| assertEquals(STATE_FAMILY, tagBag.getStateFamily()); |
| assertTrue(tagBag.getDeleteAll()); |
| assertEquals(1, tagBag.getValuesCount()); |
| assertEquals( |
| 11, CoderUtils.decodeFromByteArray(accumCoder, tagBag.getValues(0).toByteArray())[0]); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testNewCombiningNoFetch() throws Exception { |
| GroupingState<Integer, Integer> value = underTestNewKey.state(NAMESPACE, COMBINING_ADDR); |
| |
| assertThat(value.isEmpty().read(), Matchers.is(true)); |
| assertThat(value.read(), Matchers.is(Sum.ofIntegers().identity())); |
| assertThat(value.isEmpty().read(), Matchers.is(false)); |
| |
| // Shouldn't need to read from windmill for this. |
| Mockito.verifyZeroInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkAddBeforeReadEarliest() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); |
| WatermarkHoldState bag = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Instant> future = SettableFuture.create(); |
| when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future); |
| |
| bag.readLater(); |
| |
| bag.add(new Instant(3000)); |
| waitAndSet(future, new Instant(2000), 200); |
| assertThat(bag.read(), Matchers.equalTo(new Instant(2000))); |
| |
| Mockito.verify(mockReader, times(2)).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| |
| // Adding another value doesn't create another future, but does update the result. |
| bag.add(new Instant(1000)); |
| assertThat(bag.read(), Matchers.equalTo(new Instant(1000))); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkAddBeforeReadLatest() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); |
| WatermarkHoldState bag = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Instant> future = SettableFuture.create(); |
| when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future); |
| |
| // Suggesting we will read it later should get a future from the underlying WindmillStateReader |
| bag.readLater(); |
| |
| // Actually reading it will request another future, and get the same one, from |
| // WindmillStateReader |
| bag.add(new Instant(3000)); |
| waitAndSet(future, new Instant(2000), 200); |
| assertThat(bag.read(), Matchers.equalTo(new Instant(3000))); |
| |
| Mockito.verify(mockReader, times(2)).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| |
| // Adding another value doesn't create another future, but does update the result. |
| bag.add(new Instant(3000)); |
| assertThat(bag.read(), Matchers.equalTo(new Instant(3000))); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkAddBeforeReadEndOfWindow() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); |
| WatermarkHoldState bag = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Instant> future = SettableFuture.create(); |
| when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future); |
| |
| // Requests a future once |
| bag.readLater(); |
| |
| bag.add(new Instant(3000)); |
| waitAndSet(future, new Instant(3000), 200); |
| // read() requests a future again, receiving the same one |
| assertThat(bag.read(), Matchers.equalTo(new Instant(3000))); |
| |
| Mockito.verify(mockReader, times(2)).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| |
| // Adding another value doesn't create another future, but does update the result. |
| bag.add(new Instant(3000)); |
| assertThat(bag.read(), Matchers.equalTo(new Instant(3000))); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkClearBeforeRead() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); |
| |
| WatermarkHoldState bag = underTest.state(NAMESPACE, addr); |
| |
| bag.clear(); |
| assertThat(bag.read(), Matchers.nullValue()); |
| |
| bag.add(new Instant(300)); |
| assertThat(bag.read(), Matchers.equalTo(new Instant(300))); |
| |
| // Shouldn't need to read from windmill because the value is already available. |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkPersistEarliest() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); |
| WatermarkHoldState bag = underTest.state(NAMESPACE, addr); |
| |
| bag.add(new Instant(1000)); |
| bag.add(new Instant(2000)); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getWatermarkHoldsCount()); |
| |
| Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); |
| assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); |
| assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0)); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkPersistLatestEmpty() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); |
| WatermarkHoldState hold = underTest.state(NAMESPACE, addr); |
| |
| hold.add(new Instant(1000)); |
| hold.add(new Instant(2000)); |
| |
| when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)) |
| .thenReturn(Futures.<Instant>immediateFuture(null)); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getWatermarkHoldsCount()); |
| |
| Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); |
| assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); |
| assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0)); |
| |
| Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkPersistLatestWindmillWins() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); |
| WatermarkHoldState hold = underTest.state(NAMESPACE, addr); |
| |
| hold.add(new Instant(1000)); |
| hold.add(new Instant(2000)); |
| |
| when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)) |
| .thenReturn(Futures.<Instant>immediateFuture(new Instant(4000))); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getWatermarkHoldsCount()); |
| |
| Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); |
| assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); |
| assertEquals(TimeUnit.MILLISECONDS.toMicros(4000), watermarkHold.getTimestamps(0)); |
| |
| Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkPersistLatestLocalAdditionsWin() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); |
| WatermarkHoldState hold = underTest.state(NAMESPACE, addr); |
| |
| hold.add(new Instant(1000)); |
| hold.add(new Instant(2000)); |
| |
| when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)) |
| .thenReturn(Futures.<Instant>immediateFuture(new Instant(500))); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getWatermarkHoldsCount()); |
| |
| Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); |
| assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); |
| assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0)); |
| |
| Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkPersistEndOfWindow() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); |
| WatermarkHoldState hold = underTest.state(NAMESPACE, addr); |
| |
| hold.add(new Instant(2000)); |
| hold.add(new Instant(2000)); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getWatermarkHoldsCount()); |
| |
| Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); |
| assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); |
| assertEquals(TimeUnit.MILLISECONDS.toMicros(2000), watermarkHold.getTimestamps(0)); |
| |
| // Blind adds should not need to read the future. |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkClearPersist() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); |
| WatermarkHoldState hold = underTest.state(NAMESPACE, addr); |
| |
| hold.add(new Instant(500)); |
| hold.clear(); |
| hold.add(new Instant(1000)); |
| hold.add(new Instant(2000)); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getWatermarkHoldsCount()); |
| |
| Windmill.WatermarkHold clearAndUpdate = commitBuilder.getWatermarkHolds(0); |
| assertEquals(key(NAMESPACE, "watermark"), clearAndUpdate.getTag()); |
| assertEquals(1, clearAndUpdate.getTimestampsCount()); |
| assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), clearAndUpdate.getTimestamps(0)); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testWatermarkPersistEmpty() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); |
| WatermarkHoldState bag = underTest.state(NAMESPACE, addr); |
| |
| bag.add(new Instant(500)); |
| bag.clear(); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| // 1 bag update corresponds to deletion. There shouldn't be a bag update adding items. |
| assertEquals(1, commitBuilder.getWatermarkHoldsCount()); |
| } |
| |
| @Test |
| public void testNewWatermarkNoFetch() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); |
| |
| WatermarkHoldState bag = underTestNewKey.state(NAMESPACE, addr); |
| assertThat(bag.read(), Matchers.nullValue()); |
| |
| // Shouldn't need to read from windmill for this. |
| Mockito.verifyZeroInteractions(mockReader); |
| } |
| |
| @Test |
| public void testValueSetBeforeRead() throws Exception { |
| StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of()); |
| ValueState<String> value = underTest.state(NAMESPACE, addr); |
| |
| value.write("Hello"); |
| |
| assertEquals("Hello", value.read()); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testValueClearBeforeRead() throws Exception { |
| StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of()); |
| ValueState<String> value = underTest.state(NAMESPACE, addr); |
| |
| value.clear(); |
| |
| assertEquals(null, value.read()); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testValueRead() throws Exception { |
| StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of()); |
| ValueState<String> value = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<String> future = SettableFuture.create(); |
| when(mockReader.valueFuture(key(NAMESPACE, "value"), STATE_FAMILY, StringUtf8Coder.of())) |
| .thenReturn(future); |
| waitAndSet(future, "World", 200); |
| |
| assertEquals("World", value.read()); |
| } |
| |
| @Test |
| public void testValueSetPersist() throws Exception { |
| StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of()); |
| ValueState<String> value = underTest.state(NAMESPACE, addr); |
| |
| value.write("Hi"); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getValueUpdatesCount()); |
| TagValue valueUpdate = commitBuilder.getValueUpdates(0); |
| assertEquals(key(NAMESPACE, "value"), valueUpdate.getTag()); |
| assertEquals("Hi", valueUpdate.getValue().getData().toStringUtf8()); |
| assertTrue(valueUpdate.isInitialized()); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testValueClearPersist() throws Exception { |
| StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of()); |
| ValueState<String> value = underTest.state(NAMESPACE, addr); |
| |
| value.write("Hi"); |
| value.clear(); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(1, commitBuilder.getValueUpdatesCount()); |
| TagValue valueUpdate = commitBuilder.getValueUpdates(0); |
| assertEquals(key(NAMESPACE, "value"), valueUpdate.getTag()); |
| assertEquals(0, valueUpdate.getValue().getData().size()); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testValueNoChangePersist() throws Exception { |
| StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of()); |
| underTest.state(NAMESPACE, addr); |
| |
| Windmill.WorkItemCommitRequest.Builder commitBuilder = |
| Windmill.WorkItemCommitRequest.newBuilder(); |
| underTest.persist(commitBuilder); |
| |
| assertEquals(0, commitBuilder.getValueUpdatesCount()); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testNewValueNoFetch() throws Exception { |
| StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of()); |
| ValueState<String> value = underTestNewKey.state(NAMESPACE, addr); |
| |
| assertEquals(null, value.read()); |
| |
| // Shouldn't need to read from windmill for this. |
| Mockito.verifyZeroInteractions(mockReader); |
| } |
| |
| @Test |
| public void testCachedValue() throws Exception { |
| StateTag<ValueState<String>> addr = StateTags.value("value", StringUtf8Coder.of()); |
| ValueState<String> value = underTest.state(NAMESPACE, addr); |
| |
| assertEquals(0, cache.getWeight()); |
| |
| value.write("Hi"); |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(132, cache.getWeight()); |
| |
| resetUnderTest(); |
| value = underTest.state(NAMESPACE, addr); |
| assertEquals("Hi", value.read()); |
| value.clear(); |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(130, cache.getWeight()); |
| |
| resetUnderTest(); |
| value = underTest.state(NAMESPACE, addr); |
| assertEquals(null, value.read()); |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testCachedBag() throws Exception { |
| StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of()); |
| BagState<String> bag = underTest.state(NAMESPACE, addr); |
| |
| assertEquals(0, cache.getWeight()); |
| |
| SettableFuture<Iterable<String>> future = SettableFuture.create(); |
| when(mockReader.bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of())) |
| .thenReturn(future); |
| |
| bag.readLater(); |
| |
| assertEquals(0, cache.getWeight()); |
| |
| bag.add("hello"); |
| waitAndSet(future, weightedList("world"), 200); |
| Iterable<String> readResult1 = bag.read(); |
| assertThat(readResult1, Matchers.containsInAnyOrder("hello", "world")); |
| |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(140, cache.getWeight()); |
| |
| resetUnderTest(); |
| bag = underTest.state(NAMESPACE, addr); |
| bag.add("goodbye"); |
| |
| // Make sure that cached iterables have not changed after persist+add. |
| assertThat(readResult1, Matchers.containsInAnyOrder("hello", "world")); |
| |
| Iterable<String> readResult2 = bag.read(); |
| assertThat(readResult2, Matchers.containsInAnyOrder("hello", "world", "goodbye")); |
| bag.clear(); |
| // Make sure that cached iterables have not changed after clear. |
| assertThat(readResult2, Matchers.containsInAnyOrder("hello", "world", "goodbye")); |
| bag.add("new"); |
| // Make sure that cached iterables have not changed after clear+add. |
| assertThat(readResult2, Matchers.containsInAnyOrder("hello", "world", "goodbye")); |
| |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(133, cache.getWeight()); |
| |
| resetUnderTest(); |
| bag = underTest.state(NAMESPACE, addr); |
| bag.add("new2"); |
| assertThat(bag.read(), Matchers.containsInAnyOrder("new", "new2")); |
| bag.clear(); |
| bag.add("new3"); |
| |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(134, cache.getWeight()); |
| |
| resetUnderTest(); |
| bag = underTest.state(NAMESPACE, addr); |
| assertThat(bag.read(), Matchers.containsInAnyOrder("new3")); |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| Mockito.verify(mockReader, times(2)) |
| .bagFuture(key(NAMESPACE, "bag"), STATE_FAMILY, StringUtf8Coder.of()); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| public void testCachedWatermarkHold() throws Exception { |
| StateTag<WatermarkHoldState> addr = |
| StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); |
| WatermarkHoldState hold = underTest.state(NAMESPACE, addr); |
| |
| SettableFuture<Instant> future = SettableFuture.create(); |
| when(mockReader.watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY)).thenReturn(future); |
| |
| assertEquals(0, cache.getWeight()); |
| |
| hold.readLater(); |
| |
| hold.add(new Instant(3000)); |
| waitAndSet(future, new Instant(2000), 200); |
| assertThat(hold.read(), Matchers.equalTo(new Instant(2000))); |
| |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(138, cache.getWeight()); |
| |
| resetUnderTest(); |
| hold = underTest.state(NAMESPACE, addr); |
| assertThat(hold.read(), Matchers.equalTo(new Instant(2000))); |
| hold.clear(); |
| |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(138, cache.getWeight()); |
| |
| resetUnderTest(); |
| hold = underTest.state(NAMESPACE, addr); |
| assertEquals(null, hold.read()); |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| Mockito.verify(mockReader, times(2)).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| @Test |
| @SuppressWarnings("ArraysAsListPrimitiveArray") |
| public void testCachedCombining() throws Exception { |
| GroupingState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| |
| SettableFuture<Iterable<int[]>> future = SettableFuture.create(); |
| when(mockReader.bagFuture( |
| eq(key(NAMESPACE, "combining")), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any())) |
| .thenReturn(future); |
| |
| assertEquals(0, cache.getWeight()); |
| |
| value.readLater(); |
| |
| value.add(1); |
| waitAndSet(future, Arrays.asList(new int[] {2}), 200); |
| assertThat(value.read(), Matchers.equalTo(3)); |
| |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(131, cache.getWeight()); |
| |
| resetUnderTest(); |
| value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| assertThat(value.read(), Matchers.equalTo(3)); |
| value.add(3); |
| assertThat(value.read(), Matchers.equalTo(6)); |
| value.clear(); |
| |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| assertEquals(130, cache.getWeight()); |
| |
| resetUnderTest(); |
| value = underTest.state(NAMESPACE, COMBINING_ADDR); |
| assertThat(value.read(), Matchers.equalTo(0)); |
| underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| // Note that we do expect the third argument - the coder - to be equal to accumCoder, but that |
| // is possibly overspecified and currently trips an issue in the SDK where identical coders are |
| // not #equals(). |
| // |
| // What matters is the number of futures created, hence Windmill RPCs. |
| Mockito.verify(mockReader, times(2)) |
| .bagFuture(eq(key(NAMESPACE, "combining")), eq(STATE_FAMILY), Mockito.<Coder<int[]>>any()); |
| Mockito.verifyNoMoreInteractions(mockReader); |
| } |
| |
| private void disableCompactOnWrite() { |
| WindmillStateInternals.COMPACT_NOW.set(() -> false); |
| } |
| |
| private void forceCompactOnWrite() { |
| WindmillStateInternals.COMPACT_NOW.set(() -> true); |
| } |
| } |