blob: 10bf0d3ba6ffd577efc0fd8b49e0fe2bd8362ebd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
/** Tests for {@link StateInternals}. */
public abstract class StateInternalsTest {
private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
private static final StateTag<CombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR =
StateTags.combiningValueFromInputInternal("sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<CombiningState<Integer, Integer, Integer>>
SUM_INTEGER_CONTEXT_ADDR =
StateTags.combiningValueWithContext(
"sumIntegerWithContext", VarIntCoder.of(), new SummingContextFn());
private static final StateTag<BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
private static final StateTag<SetState<String>> STRING_SET_ADDR =
StateTags.set("stringSet", StringUtf8Coder.of());
private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
// Two distinct tags because they have non-equals() coders
private static final StateTag<BagState<String>> STRING_BAG_ADDR1 =
StateTags.bag("badStringBag", new StringCoderWithIdentityEquality());
private static final StateTag<BagState<String>> STRING_BAG_ADDR2 =
StateTags.bag("badStringBag", new StringCoderWithIdentityEquality());
private StateInternals underTest;
@Before
public void setUp() {
this.underTest = createStateInternals();
}
protected abstract StateInternals createStateInternals();
@Test
public void testValue() throws Exception {
ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
// State instances are cached, but depend on the namespace.
assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value));
assertThat(underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), not(equalTo(value)));
assertThat(value.read(), Matchers.nullValue());
value.write("hello");
assertThat(value.read(), equalTo("hello"));
value.write("world");
assertThat(value.read(), equalTo("world"));
value.clear();
assertThat(value.read(), Matchers.nullValue());
assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value));
}
@Test
public void testBag() throws Exception {
BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
// State instances are cached, but depend on the namespace.
assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR)));
assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))));
assertThat(value.read(), Matchers.emptyIterable());
value.add("hello");
assertThat(value.read(), containsInAnyOrder("hello"));
value.add("world");
assertThat(value.read(), containsInAnyOrder("hello", "world"));
value.clear();
assertThat(value.read(), Matchers.emptyIterable());
assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), equalTo(value));
}
@Test
public void testBagIsEmpty() throws Exception {
BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
value.add("hello");
assertThat(readFuture.read(), Matchers.is(false));
value.clear();
assertThat(readFuture.read(), Matchers.is(true));
}
@Test
public void testMergeBagIntoSource() throws Exception {
BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
bag1.add("Hello");
bag2.add("World");
bag1.add("!");
StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
// Reading the merged bag gets both the contents
assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!"));
assertThat(bag2.read(), Matchers.emptyIterable());
}
@Test
public void testMergeBagIntoNewNamespace() throws Exception {
BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
bag1.add("Hello");
bag2.add("World");
bag1.add("!");
StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
// Reading the merged bag gets both the contents
assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!"));
assertThat(bag1.read(), Matchers.emptyIterable());
assertThat(bag2.read(), Matchers.emptyIterable());
}
@Test
public void testSet() throws Exception {
SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
// State instances are cached, but depend on the namespace.
assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR)));
assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR))));
// empty
assertThat(value.read(), Matchers.emptyIterable());
assertFalse(value.contains("A").read());
// add
value.add("A");
value.add("B");
value.add("A");
assertFalse(value.addIfAbsent("B").read());
assertThat(value.read(), containsInAnyOrder("A", "B"));
// remove
value.remove("A");
assertThat(value.read(), containsInAnyOrder("B"));
value.remove("C");
assertThat(value.read(), containsInAnyOrder("B"));
// contains
assertFalse(value.contains("A").read());
assertTrue(value.contains("B").read());
value.add("C");
value.add("D");
// readLater
assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D"));
SetState<String> later = value.readLater();
assertThat(later.read(), hasItems("C", "D"));
assertFalse(later.contains("A").read());
// clear
value.clear();
assertThat(value.read(), Matchers.emptyIterable());
assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), equalTo(value));
}
@Test
public void testSetIsEmpty() throws Exception {
SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
value.add("hello");
assertThat(readFuture.read(), Matchers.is(false));
value.clear();
assertThat(readFuture.read(), Matchers.is(true));
}
@Test
public void testMergeSetIntoSource() throws Exception {
SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
set1.add("Hello");
set2.add("Hello");
set2.add("World");
set1.add("!");
StateMerging.mergeSets(Arrays.asList(set1, set2), set1);
// Reading the merged set gets both the contents
assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!"));
assertThat(set2.read(), Matchers.emptyIterable());
}
@Test
public void testMergeSetIntoNewNamespace() throws Exception {
SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR);
set1.add("Hello");
set2.add("Hello");
set2.add("World");
set1.add("!");
StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3);
// Reading the merged set gets both the contents
assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!"));
assertThat(set1.read(), Matchers.emptyIterable());
assertThat(set2.read(), Matchers.emptyIterable());
}
// for testMap
private static class MapEntry<K, V> implements Map.Entry<K, V> {
private K key;
private V value;
private MapEntry(K key, V value) {
this.key = key;
this.value = value;
}
static <K, V> Map.Entry<K, V> of(K k, V v) {
return new MapEntry<>(k, v);
}
@Override
public final K getKey() {
return key;
}
@Override
public final V getValue() {
return value;
}
@Override
public final String toString() {
return key + "=" + value;
}
@Override
public final int hashCode() {
return Objects.hashCode(key) ^ Objects.hashCode(value);
}
@Override
public final V setValue(V newValue) {
V oldValue = value;
value = newValue;
return oldValue;
}
@Override
public final boolean equals(Object o) {
if (o == this) {
return true;
}
if (o instanceof Map.Entry) {
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
if (Objects.equals(key, e.getKey()) && Objects.equals(value, e.getValue())) {
return true;
}
}
return false;
}
}
@Test
public void testMap() throws Exception {
MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
// State instances are cached, but depend on the namespace.
assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR)));
assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR))));
// put
assertThat(value.entries().read(), Matchers.emptyIterable());
value.put("A", 1);
value.put("B", 2);
value.put("A", 11);
assertThat(value.putIfAbsent("B", 22).read(), equalTo(2));
assertThat(
value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11), MapEntry.of("B", 2)));
// remove
value.remove("A");
assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
value.remove("C");
assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
// get
assertNull(value.get("A").read());
assertThat(value.get("B").read(), equalTo(2));
value.put("C", 3);
value.put("D", 4);
assertThat(value.get("C").read(), equalTo(3));
// iterate
value.put("E", 5);
value.remove("C");
assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E"));
assertThat(value.values().read(), containsInAnyOrder(2, 4, 5));
assertThat(
value.entries().read(),
containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
// readLater
assertThat(value.get("B").readLater().read(), equalTo(2));
assertNull(value.get("A").readLater().read());
assertThat(
value.entries().readLater().read(),
containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
// clear
value.clear();
assertThat(value.entries().read(), Matchers.emptyIterable());
assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), equalTo(value));
}
@Test
public void testCombiningValue() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
assertThat(value.read(), equalTo(0));
value.add(2);
assertThat(value.read(), equalTo(2));
value.add(3);
assertThat(value.read(), equalTo(5));
value.clear();
assertThat(value.read(), equalTo(0));
assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), equalTo(value));
}
@Test
public void testCombiningIsEmpty() throws Exception {
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
value.add(5);
assertThat(readFuture.read(), Matchers.is(false));
value.clear();
assertThat(readFuture.read(), Matchers.is(true));
}
@Test
public void testMergeCombiningValueIntoSource() throws Exception {
CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
assertThat(value1.getAccum(), Matchers.is(notNullValue()));
assertThat(value2.getAccum(), Matchers.is(notNullValue()));
value1.add(5);
value2.add(10);
value1.add(6);
assertThat(value1.read(), equalTo(11));
assertThat(value2.read(), equalTo(10));
// Merging clears the old values and updates the result value.
StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
assertThat(value1.read(), equalTo(21));
assertThat(value2.read(), equalTo(0));
}
@Test
public void testMergeCombiningValueIntoNewNamespace() throws Exception {
CombiningState<Integer, int[], Integer> value1 = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
CombiningState<Integer, int[], Integer> value2 = underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
CombiningState<Integer, int[], Integer> value3 = underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
assertThat(value1.getAccum(), Matchers.is(notNullValue()));
assertThat(value2.getAccum(), Matchers.is(notNullValue()));
assertThat(value3.getAccum(), Matchers.is(notNullValue()));
value1.add(5);
value2.add(10);
value1.add(6);
StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
// Merging clears the old values and updates the result value.
assertThat(value1.read(), equalTo(0));
assertThat(value2.read(), equalTo(0));
assertThat(value3.read(), equalTo(21));
}
@Test
public void testMergeCombiningWithContextValueIntoSource() throws Exception {
CombiningState<Integer, Integer, Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_CONTEXT_ADDR);
CombiningState<Integer, Integer, Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_CONTEXT_ADDR);
assertThat(value1.getAccum(), Matchers.is(notNullValue()));
assertThat(value2.getAccum(), Matchers.is(notNullValue()));
value1.add(5);
value2.add(10);
value1.add(6);
assertThat(value1.read(), equalTo(11));
assertThat(value2.read(), equalTo(10));
// Merging clears the old values and updates the result value.
StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
assertThat(value1.read(), equalTo(21));
assertThat(value2.read(), equalTo(0));
}
@Test
public void testMergeCombiningWithContextValueIntoNewNamespace() throws Exception {
CombiningState<Integer, Integer, Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_CONTEXT_ADDR);
CombiningState<Integer, Integer, Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_CONTEXT_ADDR);
CombiningState<Integer, Integer, Integer> value3 =
underTest.state(NAMESPACE_3, SUM_INTEGER_CONTEXT_ADDR);
assertThat(value1.getAccum(), Matchers.is(notNullValue()));
assertThat(value2.getAccum(), Matchers.is(notNullValue()));
assertThat(value3.getAccum(), Matchers.is(notNullValue()));
value1.add(5);
value2.add(10);
value1.add(6);
StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
// Merging clears the old values and updates the result value.
assertThat(value1.read(), equalTo(0));
assertThat(value2.read(), equalTo(0));
assertThat(value3.read(), equalTo(21));
}
@Test
public void testWatermarkEarliestState() throws Exception {
WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
assertThat(value.read(), Matchers.nullValue());
value.add(new Instant(2000));
assertThat(value.read(), equalTo(new Instant(2000)));
value.add(new Instant(3000));
assertThat(value.read(), equalTo(new Instant(2000)));
value.add(new Instant(1000));
assertThat(value.read(), equalTo(new Instant(1000)));
value.clear();
assertThat(value.read(), equalTo(null));
assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), equalTo(value));
}
@Test
public void testWatermarkLatestState() throws Exception {
WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
assertThat(value.read(), Matchers.nullValue());
value.add(new Instant(2000));
assertThat(value.read(), equalTo(new Instant(2000)));
value.add(new Instant(3000));
assertThat(value.read(), equalTo(new Instant(3000)));
value.add(new Instant(1000));
assertThat(value.read(), equalTo(new Instant(3000)));
value.clear();
assertThat(value.read(), equalTo(null));
assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), equalTo(value));
}
@Test
public void testWatermarkEndOfWindowState() throws Exception {
WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
assertThat(value.read(), Matchers.nullValue());
value.add(new Instant(2000));
assertThat(value.read(), equalTo(new Instant(2000)));
value.clear();
assertThat(value.read(), equalTo(null));
assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), equalTo(value));
}
@Test
public void testWatermarkStateIsEmpty() throws Exception {
WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
value.add(new Instant(1000));
assertThat(readFuture.read(), Matchers.is(false));
value.clear();
assertThat(readFuture.read(), Matchers.is(true));
}
@Test
public void testSetReadable() throws Exception {
SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
// test contains
ReadableState<Boolean> readable = value.contains("A");
value.add("A");
assertFalse(readable.read());
// test addIfAbsent
value.addIfAbsent("B");
assertTrue(value.contains("B").read());
}
@Test
public void testMapReadable() throws Exception {
MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
// test iterable, should just return a iterable view of the values contained in this map.
// The iterable is backed by the map, so changes to the map are reflected in the iterable.
ReadableState<Iterable<String>> keys = value.keys();
ReadableState<Iterable<Integer>> values = value.values();
ReadableState<Iterable<Map.Entry<String, Integer>>> entries = value.entries();
value.put("A", 1);
assertFalse(Iterables.isEmpty(keys.read()));
assertFalse(Iterables.isEmpty(values.read()));
assertFalse(Iterables.isEmpty(entries.read()));
// test get
ReadableState<Integer> get = value.get("B");
value.put("B", 2);
assertNull(get.read());
// test addIfAbsent
value.putIfAbsent("C", 3);
assertThat(value.get("C").read(), equalTo(3));
}
@Test
public void testBagWithBadCoderEquality() throws Exception {
// Ensure two instances of the bad coder are distinct; models user who fails to
// override equals() or inherit from CustomCoder for StructuredCoder
assertThat(
new StringCoderWithIdentityEquality(), not(equalTo(new StringCoderWithIdentityEquality())));
BagState<String> state1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR1);
state1.add("hello");
BagState<String> state2 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR2);
assertThat(state2.read(), containsInAnyOrder("hello"));
}
private static class StringCoderWithIdentityEquality extends Coder<String> {
private final StringUtf8Coder realCoder = StringUtf8Coder.of();
@Override
public void encode(String value, OutputStream outStream) throws CoderException, IOException {
realCoder.encode(value, outStream);
}
@Override
public String decode(InputStream inStream) throws CoderException, IOException {
return realCoder.decode(inStream);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return null;
}
@Override
public void verifyDeterministic() throws NonDeterministicException {}
@Override
public boolean equals(Object other) {
return other == this;
}
@Override
public int hashCode() {
return super.hashCode();
}
}
private static class SummingContextFn
extends CombineWithContext.CombineFnWithContext<Integer, Integer, Integer> {
@Override
public Integer createAccumulator(CombineWithContext.Context c) {
return 0;
}
@Override
public Integer addInput(Integer accumulator, Integer input, CombineWithContext.Context c) {
return accumulator + input;
}
@Override
public Integer mergeAccumulators(Iterable<Integer> accumulators, CombineWithContext.Context c) {
int sum = createAccumulator(c);
for (Integer accumulator : accumulators) {
sum += accumulator;
}
return sum;
}
@Override
public Integer extractOutput(Integer accumulator, CombineWithContext.Context c) {
return accumulator;
}
}
}