blob: 3bd983446338838d950dde69124c62cdf372e367 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.runtime;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.transforms.UpdatingCombineFn;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateContexts;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.samza.context.TaskContext;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.joda.time.Instant;
/** {@link StateInternals} that uses Samza local {@link KeyValueStore} to manage state. */
public class SamzaStoreStateInternals<K> implements StateInternals {
static final String BEAM_STORE = "beamStore";
private static ThreadLocal<SoftReference<ByteArrayOutputStream>> threadLocalBaos =
new ThreadLocal<>();
// the stores include both beamStore for system states as well as stores for user state
private final Map<String, KeyValueStore<byte[], byte[]>> stores;
private final K key;
private final byte[] keyBytes;
private final int batchGetSize;
private final String stageId;
private SamzaStoreStateInternals(
Map<String, KeyValueStore<byte[], byte[]>> stores,
@Nullable K key,
@Nullable byte[] keyBytes,
String stageId,
int batchGetSize) {
this.stores = stores;
this.key = key;
this.keyBytes = keyBytes;
this.batchGetSize = batchGetSize;
this.stageId = stageId;
}
@SuppressWarnings("unchecked")
static KeyValueStore<byte[], byte[]> getBeamStore(TaskContext context) {
return (KeyValueStore<byte[], byte[]>) context.getStore(SamzaStoreStateInternals.BEAM_STORE);
}
static Factory createStateInternalFactory(
String id,
Coder<?> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
DoFnSignature signature) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
final Map<String, KeyValueStore<byte[], byte[]>> stores = new HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
final Coder stateKeyCoder;
if (keyCoder != null) {
signature
.stateDeclarations()
.keySet()
.forEach(
stateId ->
stores.put(stateId, (KeyValueStore<byte[], byte[]>) context.getStore(stateId)));
stateKeyCoder = keyCoder;
} else {
stateKeyCoder = VoidCoder.of();
}
return new Factory<>(Objects.toString(id), stores, stateKeyCoder, batchGetSize);
}
@Override
public K getKey() {
return key;
}
@Override
public <T extends State> T state(StateNamespace stateNamespace, StateTag<T> stateTag) {
return state(stateNamespace, stateTag, StateContexts.nullContext());
}
@Override
public <V extends State> V state(
StateNamespace namespace, StateTag<V> address, StateContext<?> stateContext) {
return address.bind(
new StateTag.StateBinder() {
@Override
public <T> ValueState<T> bindValue(StateTag<ValueState<T>> spec, Coder<T> coder) {
return new SamzaValueState<>(namespace, address, coder);
}
@Override
public <T> BagState<T> bindBag(StateTag<BagState<T>> spec, Coder<T> elemCoder) {
return new SamzaBagState<>(namespace, address, elemCoder);
}
@Override
public <T> SetState<T> bindSet(StateTag<SetState<T>> spec, Coder<T> elemCoder) {
return new SamzaSetStateImpl<>(namespace, address, elemCoder);
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
StateTag<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
return new SamzaMapStateImpl<>(namespace, address, mapKeyCoder, mapValueCoder);
}
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
StateTag<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SamzaAccumulatorCombiningState<>(namespace, address, accumCoder, combineFn);
}
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
StateTag<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
throw new UnsupportedOperationException(
String.format("%s is not supported", CombiningState.class.getSimpleName()));
}
@Override
public WatermarkHoldState bindWatermark(
StateTag<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
return new SamzaWatermarkHoldState(namespace, address, timestampCombiner);
}
});
}
/** Reuse the ByteArrayOutputStream buffer. */
private static ByteArrayOutputStream getThreadLocalBaos() {
final SoftReference<ByteArrayOutputStream> refBaos = threadLocalBaos.get();
ByteArrayOutputStream baos = refBaos == null ? null : refBaos.get();
if (baos == null) {
baos = new ByteArrayOutputStream();
threadLocalBaos.set(new SoftReference<>(baos));
}
baos.reset();
return baos;
}
/** Factory class to create {@link SamzaStoreStateInternals}. */
public static class Factory<K> implements StateInternalsFactory<K> {
private final String stageId;
private final Map<String, KeyValueStore<byte[], byte[]>> stores;
private final Coder<K> keyCoder;
private final int batchGetSize;
public Factory(
String stageId,
Map<String, KeyValueStore<byte[], byte[]>> stores,
Coder<K> keyCoder,
int batchGetSize) {
this.stageId = stageId;
this.stores = stores;
this.keyCoder = keyCoder;
this.batchGetSize = batchGetSize;
}
@Override
public StateInternals stateInternalsForKey(@Nullable K key) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos);
try {
if (key != null) {
keyCoder.encode(key, baos);
}
final byte[] keyBytes = baos.toByteArray();
baos.reset();
dos.write(keyBytes.length);
dos.write(keyBytes);
} catch (IOException e) {
throw new RuntimeException("Cannot encode key for state store", e);
}
return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(), stageId, batchGetSize);
}
}
/** An internal State interface that holds underlying KeyValueIterators. */
interface KeyValueIteratorState {
void closeIterators();
}
private abstract class AbstractSamzaState<T> {
private final Coder<T> coder;
private final byte[] encodedStoreKey;
private final String namespace;
protected final KeyValueStore<byte[], byte[]> store;
protected AbstractSamzaState(
StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
this.coder = coder;
this.namespace = namespace.stringKey();
final KeyValueStore<byte[], byte[]> userStore = stores.get(address.getId());
this.store = userStore != null ? userStore : stores.get(BEAM_STORE);
final ByteArrayOutputStream baos = getThreadLocalBaos();
try (DataOutputStream dos = new DataOutputStream(baos)) {
dos.write(keyBytes);
dos.writeUTF(namespace.stringKey());
if (userStore == null) {
// for system state, we need to differentiate based on the following:
dos.writeUTF(stageId);
dos.writeUTF(address.getId());
}
} catch (IOException e) {
throw new RuntimeException(
"Could not encode full address for state: " + address.getId(), e);
}
this.encodedStoreKey = baos.toByteArray();
}
protected void clearInternal() {
store.delete(getEncodedStoreKey());
}
protected void writeInternal(T value) {
store.put(getEncodedStoreKey(), encodeValue(value));
}
protected T readInternal() {
final byte[] valueBytes = store.get(getEncodedStoreKey());
return decodeValue(valueBytes);
}
protected ReadableState<Boolean> isEmptyInternal() {
return new ReadableState<Boolean>() {
@Override
public Boolean read() {
return store.get(getEncodedStoreKey()) == null;
}
@Override
public ReadableState<Boolean> readLater() {
return this;
}
};
}
protected byte[] getEncodedStoreKey() {
return encodedStoreKey;
}
protected byte[] encodeValue(T value) {
final ByteArrayOutputStream baos = getThreadLocalBaos();
try {
coder.encode(value, baos);
} catch (IOException e) {
throw new RuntimeException("Could not encode state value: " + value, e);
}
return baos.toByteArray();
}
protected T decodeValue(byte[] valueBytes) {
if (valueBytes != null) {
try {
return coder.decode(new ByteArrayInputStream(valueBytes));
} catch (IOException e) {
throw new RuntimeException("Could not decode state", e);
}
}
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
@SuppressWarnings("unchecked")
final AbstractSamzaState<?> that = (AbstractSamzaState<?>) o;
return Arrays.equals(encodedStoreKey, that.encodedStoreKey);
}
@Override
public int hashCode() {
int result = namespace.hashCode();
result = 31 * result + Arrays.hashCode(encodedStoreKey);
return result;
}
}
private class SamzaValueState<T> extends AbstractSamzaState<T> implements ValueState<T> {
private SamzaValueState(
StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
super(namespace, address, coder);
}
@Override
public void write(T input) {
writeInternal(input);
}
@Override
public T read() {
return readInternal();
}
@Override
public ValueState<T> readLater() {
return this;
}
@Override
public void clear() {
clearInternal();
}
}
private class SamzaBagState<T> extends AbstractSamzaState<T> implements BagState<T> {
private SamzaBagState(
StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
super(namespace, address, coder);
}
@Override
public void add(T value) {
synchronized (store) {
final int size = getSize();
final byte[] encodedKey = encodeKey(size);
store.put(encodedKey, encodeValue(value));
store.put(getEncodedStoreKey(), Ints.toByteArray(size + 1));
}
}
@Override
public ReadableState<Boolean> isEmpty() {
synchronized (store) {
return isEmptyInternal();
}
}
@Override
@Nonnull
public List<T> read() {
synchronized (store) {
final int size = getSize();
if (size == 0) {
return Collections.emptyList();
}
final List<T> values = new ArrayList<>(size);
final List<byte[]> keys = new ArrayList<>(size);
int start = 0;
while (start < size) {
final int end = Math.min(size, start + batchGetSize);
for (int i = start; i < end; i++) {
keys.add(encodeKey(i));
}
store.getAll(keys).values().forEach(value -> values.add(decodeValue(value)));
start += batchGetSize;
keys.clear();
}
return values;
}
}
@Override
public BagState<T> readLater() {
return this;
}
@Override
public void clear() {
synchronized (store) {
final int size = getSize();
if (size != 0) {
final List<byte[]> keys = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
keys.add(encodeKey(i));
}
store.deleteAll(keys);
store.delete(getEncodedStoreKey());
}
}
}
private int getSize() {
final byte[] sizeBytes = store.get(getEncodedStoreKey());
return sizeBytes == null ? 0 : Ints.fromByteArray(sizeBytes);
}
private byte[] encodeKey(int size) {
final ByteArrayOutputStream baos = getThreadLocalBaos();
try (DataOutputStream dos = new DataOutputStream(baos)) {
dos.write(getEncodedStoreKey());
dos.writeInt(size);
return baos.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private class SamzaSetStateImpl<T> implements SamzaSetState<T>, KeyValueIteratorState {
private final SamzaMapStateImpl<T, Boolean> mapState;
private SamzaSetStateImpl(
StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
mapState = new SamzaMapStateImpl<>(namespace, address, coder, BooleanCoder.of());
}
@Override
public ReadableState<Boolean> contains(T t) {
return mapState.get(t);
}
@Override
@Nullable
public ReadableState<Boolean> addIfAbsent(T t) {
return mapState.putIfAbsent(t, true);
}
@Override
public void remove(T t) {
mapState.remove(t);
}
@Override
public void add(T value) {
mapState.put(value, true);
}
@Override
public ReadableState<Boolean> isEmpty() {
return new ReadableState<Boolean>() {
@Override
public Boolean read() {
return Iterables.isEmpty(mapState.entries().read());
}
@Override
public ReadableState<Boolean> readLater() {
return this;
}
};
}
@Override
public Iterable<T> read() {
return mapState.keys().read();
}
@Override
public SetState<T> readLater() {
return this;
}
@Override
public void clear() {
mapState.clear();
}
@Override
public ReadableState<Iterator<T>> readIterator() {
final Iterator<Map.Entry<T, Boolean>> iter = mapState.readIterator().read();
return new ReadableState<Iterator<T>>() {
@Nullable
@Override
public Iterator<T> read() {
return new Iterator<T>() {
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public T next() {
return iter.next().getKey();
}
};
}
@Override
public ReadableState<Iterator<T>> readLater() {
return this;
}
};
}
@Override
public void closeIterators() {
mapState.closeIterators();
}
}
private class SamzaMapStateImpl<KeyT, ValueT> extends AbstractSamzaState<ValueT>
implements SamzaMapState<KeyT, ValueT>, KeyValueIteratorState {
private final Coder<KeyT> keyCoder;
private final int storeKeySize;
private final List<KeyValueIterator<byte[], byte[]>> openIterators =
Collections.synchronizedList(new ArrayList<>());
private int maxKeySize;
protected SamzaMapStateImpl(
StateNamespace namespace,
StateTag<? extends State> address,
Coder<KeyT> keyCoder,
Coder<ValueT> valueCoder) {
super(namespace, address, valueCoder);
this.keyCoder = keyCoder;
this.storeKeySize = getEncodedStoreKey().length;
// initial max key size is around 100k, so we can restore timer keys
this.maxKeySize = this.storeKeySize + 100_000;
}
@Override
public void put(KeyT key, ValueT value) {
final byte[] encodedKey = encodeKey(key);
maxKeySize = Math.max(maxKeySize, encodedKey.length);
store.put(encodedKey, encodeValue(value));
}
@Override
@Nullable
public ReadableState<ValueT> putIfAbsent(KeyT key, ValueT value) {
final byte[] encodedKey = encodeKey(key);
final ValueT current = decodeValue(store.get(encodedKey));
if (current == null) {
put(key, value);
}
return current == null ? null : ReadableStates.immediate(current);
}
@Override
public void remove(KeyT key) {
store.delete(encodeKey(key));
}
@Override
public ReadableState<ValueT> get(KeyT key) {
ValueT value = decodeValue(store.get(encodeKey(key)));
return ReadableStates.immediate(value);
}
@Override
public ReadableState<Iterable<KeyT>> keys() {
return new ReadableState<Iterable<KeyT>>() {
@Override
public Iterable<KeyT> read() {
return createIterable(entry -> decodeKey(entry.getKey()));
}
@Override
public ReadableState<Iterable<KeyT>> readLater() {
return this;
}
};
}
@Override
public ReadableState<Iterable<ValueT>> values() {
return new ReadableState<Iterable<ValueT>>() {
@Override
public Iterable<ValueT> read() {
return createIterable(entry -> decodeValue(entry.getValue()));
}
@Override
public ReadableState<Iterable<ValueT>> readLater() {
return this;
}
};
}
@Override
public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>() {
@Override
public Iterable<Map.Entry<KeyT, ValueT>> read() {
return createIterable(
entry ->
new AbstractMap.SimpleEntry<>(
decodeKey(entry.getKey()), decodeValue(entry.getValue())));
}
@Override
public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> readLater() {
return this;
}
};
}
@Override
public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator() {
final byte[] maxKey = createMaxKey();
final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
openIterators.add(kvIter);
return new ReadableState<Iterator<Map.Entry<KeyT, ValueT>>>() {
@Nullable
@Override
public Iterator<Map.Entry<KeyT, ValueT>> read() {
return new Iterator<Map.Entry<KeyT, ValueT>>() {
@Override
public boolean hasNext() {
boolean hasNext = kvIter.hasNext();
if (!hasNext) {
kvIter.close();
openIterators.remove(kvIter);
}
return hasNext;
}
@Override
public Map.Entry<KeyT, ValueT> next() {
Entry<byte[], byte[]> entry = kvIter.next();
return new AbstractMap.SimpleEntry<>(
decodeKey(entry.getKey()), decodeValue(entry.getValue()));
}
};
}
@Override
public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readLater() {
return this;
}
};
}
/**
* Since we are not able to track the instances of the iterators created here and close them
* properly, we need to load the content into memory.
*/
private <OutputT> Iterable<OutputT> createIterable(
SerializableFunction<org.apache.samza.storage.kv.Entry<byte[], byte[]>, OutputT> fn) {
final byte[] maxKey = createMaxKey();
final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
final List<Entry<byte[], byte[]>> iterable = ImmutableList.copyOf(kvIter);
kvIter.close();
return new Iterable<OutputT>() {
@Override
public Iterator<OutputT> iterator() {
final Iterator<Entry<byte[], byte[]>> iter = iterable.iterator();
return new Iterator<OutputT>() {
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public OutputT next() {
return fn.apply(iter.next());
}
};
}
};
}
@Override
public void clear() {
final byte[] maxKey = createMaxKey();
final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
while (kvIter.hasNext()) {
store.delete(kvIter.next().getKey());
}
kvIter.close();
}
private byte[] encodeKey(KeyT key) {
try {
final ByteArrayOutputStream baos = getThreadLocalBaos();
baos.write(getEncodedStoreKey());
keyCoder.encode(key, baos);
return baos.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private KeyT decodeKey(byte[] keyBytes) {
try {
final byte[] realKey = Arrays.copyOfRange(keyBytes, storeKeySize, keyBytes.length);
return keyCoder.decode(new ByteArrayInputStream(realKey));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private byte[] createMaxKey() {
byte[] maxKey = new byte[maxKeySize];
Arrays.fill(maxKey, (byte) 0xff);
final byte[] encodedKey = getEncodedStoreKey();
System.arraycopy(encodedKey, 0, maxKey, 0, encodedKey.length);
return maxKey;
}
@Override
public void closeIterators() {
openIterators.forEach(KeyValueIterator::close);
openIterators.clear();
}
}
private class SamzaAccumulatorCombiningState<InT, AccumT, OutT> extends AbstractSamzaState<AccumT>
implements CombiningState<InT, AccumT, OutT> {
private final Combine.CombineFn<InT, AccumT, OutT> combineFn;
protected SamzaAccumulatorCombiningState(
StateNamespace namespace,
StateTag<? extends State> address,
Coder<AccumT> coder,
Combine.CombineFn<InT, AccumT, OutT> combineFn) {
super(namespace, address, coder);
this.combineFn = combineFn;
}
@Override
public void clear() {
clearInternal();
}
@Override
public void add(InT value) {
final AccumT accum = getAccum();
final AccumT current = combineFn.addInput(accum, value);
writeInternal(current);
}
@Override
public ReadableState<Boolean> isEmpty() {
return isEmptyInternal();
}
@Override
public AccumT getAccum() {
final AccumT accum = readInternal();
return accum != null ? accum : combineFn.createAccumulator();
}
@Override
public void addAccum(AccumT accum) {
final AccumT currentAccum = getAccum();
final AccumT mergedAccum = mergeAccumulators(Arrays.asList(currentAccum, accum));
writeInternal(mergedAccum);
}
@Override
public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
return combineFn.mergeAccumulators(accumulators);
}
@Override
public CombiningState<InT, AccumT, OutT> readLater() {
return this;
}
@Override
@Nonnull
public OutT read() {
AccumT accum = getAccum();
OutT output = combineFn.extractOutput(accum);
if (combineFn instanceof UpdatingCombineFn) {
AccumT updatedAccum =
((UpdatingCombineFn<InT, AccumT, OutT>) combineFn).updateAfterFiring(accum);
writeInternal(updatedAccum);
}
return output;
}
}
private class SamzaWatermarkHoldState extends AbstractSamzaState<Instant>
implements WatermarkHoldState {
private final TimestampCombiner timestampCombiner;
public <V extends State> SamzaWatermarkHoldState(
StateNamespace namespace, StateTag<V> address, TimestampCombiner timestampCombiner) {
super(namespace, address, InstantCoder.of());
this.timestampCombiner = timestampCombiner;
}
@Override
public void add(Instant value) {
final Instant currentValue = readInternal();
final Instant combinedValue =
currentValue == null ? value : timestampCombiner.combine(currentValue, value);
if (!combinedValue.equals(currentValue)) {
writeInternal(combinedValue);
}
}
@Override
public ReadableState<Boolean> isEmpty() {
return isEmptyInternal();
}
@Override
public Instant read() {
return readInternal();
}
@Override
public TimestampCombiner getTimestampCombiner() {
return this.timestampCombiner;
}
@Override
public WatermarkHoldState readLater() {
return this;
}
@Override
public void clear() {
clearInternal();
}
}
}