blob: 894f6789f7d3aa6f62f0d08598905deaf253fb80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.state;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
/** Static methods for working with {@link StateSpec StateSpecs}. */
@Experimental(Kind.STATE)
public class StateSpecs {
private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault();
private StateSpecs() {}
/**
* Create a {@link StateSpec} for a single value of type {@code T}.
*
* <p>This method attempts to infer the accumulator coder automatically.
*
* @see #value(Coder)
*/
public static <T> StateSpec<ValueState<T>> value() {
return new ValueStateSpec<>(null);
}
/**
* Identical to {@link #value()}, but with a coder explicitly supplied.
*
* <p>If automatic coder inference fails, use this method.
*/
public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) {
checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead");
return new ValueStateSpec<>(valueCoder);
}
/**
* Create a {@link StateSpec} for a {@link CombiningState} which uses a {@link CombineFn} to
* automatically merge multiple values of type {@code InputT} into a single resulting {@code
* OutputT}.
*
* <p>This method attempts to infer the accumulator coder automatically.
*
* @see #combining(Coder, CombineFn)
*/
public static <InputT, AccumT, OutputT>
StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningStateSpec<>(null, combineFn);
}
/**
* <b>For internal use only; no backwards compatibility guarantees</b>
*
* <p>Create a {@link StateSpec} for a {@link CombiningState} which uses a {@link
* CombineFnWithContext} to automatically merge multiple values of type {@code InputT} into a
* single resulting {@code OutputT}.
*
* <p>This method attempts to infer the accumulator coder automatically.
*
* @see #combining(Coder, CombineFnWithContext)
*/
@Internal
public static <InputT, AccumT, OutputT>
StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new CombiningWithContextStateSpec<>(null, combineFn);
}
/**
* Identical to {@link #combining(CombineFn)}, but with an accumulator coder explicitly supplied.
*
* <p>If automatic coder inference fails, use this method.
*/
public static <InputT, AccumT, OutputT>
StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
checkArgument(
accumCoder != null,
"accumCoder should not be null. "
+ "Consider using combining(CombineFn<> combineFn) instead.");
return combiningInternal(accumCoder, combineFn);
}
/**
* <b>For internal use only; no backwards compatibility guarantees</b>
*
* <p>Identical to {@link #combining(CombineFnWithContext)}, but with an accumulator coder
* explicitly supplied.
*
* <p>If automatic coder inference fails, use this method.
*/
@Internal
public static <InputT, AccumT, OutputT>
StateSpec<CombiningState<InputT, AccumT, OutputT>> combining(
Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return combiningInternal(accumCoder, combineFn);
}
/**
* Create a {@link StateSpec} for a {@link BagState}, optimized for adding values frequently and
* occasionally retrieving all the values that have been added.
*
* <p>This method attempts to infer the element coder automatically.
*
* @see #bag(Coder)
*/
public static <T> StateSpec<BagState<T>> bag() {
return new BagStateSpec<>(null);
}
/**
* Identical to {@link #bag()}, but with an element coder explicitly supplied.
*
* <p>If automatic coder inference fails, use this method.
*/
public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) {
return new BagStateSpec<>(elemCoder);
}
/**
* Create a {@link StateSpec} for a {@link SetState}, optimized for checking membership.
*
* <p>This method attempts to infer the element coder automatically.
*
* @see #set(Coder)
*/
public static <T> StateSpec<SetState<T>> set() {
return new SetStateSpec<>(null);
}
/**
* Identical to {@link #set()}, but with an element coder explicitly supplied.
*
* <p>If automatic coder inference fails, use this method.
*/
public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) {
return new SetStateSpec<>(elemCoder);
}
/**
* Create a {@link StateSpec} for a {@link SetState}, optimized for key lookups and writes.
*
* <p>This method attempts to infer the key and value coders automatically.
*
* @see #map(Coder, Coder)
*/
public static <K, V> StateSpec<MapState<K, V>> map() {
return new MapStateSpec<>(null, null);
}
/**
* Identical to {@link #map()}, but with key and value coders explicitly supplied.
*
* <p>If automatic coder inference fails, use this method.
*/
public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) {
return new MapStateSpec<>(keyCoder, valueCoder);
}
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Create a state spec for values that use a {@link CombineFn} to automatically merge multiple
* {@code InputT}s into a single {@code OutputT}.
*
* <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should
* only be used to initialize static values.
*/
@Internal
public static <InputT, AccumT, OutputT>
StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningFromInputInternal(
Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
try {
Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder);
return combiningInternal(accumCoder, combineFn);
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException(
"Unable to determine accumulator coder for "
+ combineFn.getClass().getSimpleName()
+ " from "
+ inputCoder,
e);
}
}
private static <InputT, AccumT, OutputT>
StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new CombiningStateSpec<>(accumCoder, combineFn);
}
private static <InputT, AccumT, OutputT>
StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal(
Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new CombiningWithContextStateSpec<>(accumCoder, combineFn);
}
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Create a state spec for a watermark hold.
*/
@Internal
public static StateSpec<WatermarkHoldState> watermarkStateInternal(
TimestampCombiner timestampCombiner) {
return new WatermarkStateSpecInternal(timestampCombiner);
}
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Convert a combining state spec to a bag of accumulators.
*/
@Internal
public static <InputT, AccumT, OutputT> StateSpec<BagState<AccumT>> convertToBagSpecInternal(
StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
if (combiningSpec instanceof CombiningStateSpec) {
// Checked above; conversion to a bag spec depends on the provided spec being one of those
// created via the factory methods in this class.
@SuppressWarnings("unchecked")
CombiningStateSpec<InputT, AccumT, OutputT> typedSpec =
(CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec;
return typedSpec.asBagSpec();
} else if (combiningSpec instanceof CombiningWithContextStateSpec) {
@SuppressWarnings("unchecked")
CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec =
(CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec;
return typedSpec.asBagSpec();
} else {
throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
}
}
/**
* A specification for a state cell holding a settable value of type {@code T}.
*
* <p>Includes the coder for {@code T}.
*/
private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> {
@Nullable private Coder<T> coder;
private ValueStateSpec(@Nullable Coder<T> coder) {
this.coder = coder;
}
@Override
public ValueState<T> bind(String id, StateBinder visitor) {
return visitor.bindValue(id, this, coder);
}
@Override
public <ResultT> ResultT match(Cases<ResultT> cases) {
return cases.dispatchValue(coder);
}
@SuppressWarnings("unchecked")
@Override
public void offerCoders(Coder[] coders) {
if (this.coder == null && coders[0] != null) {
this.coder = (Coder<T>) coders[0];
}
}
@Override
public void finishSpecifying() {
if (coder == null) {
throw new IllegalStateException(
"Unable to infer a coder for ValueState and no Coder"
+ " was specified. Please set a coder by either invoking"
+ " StateSpecs.value(Coder<T> valueCoder) or by registering the coder in the"
+ " Pipeline's CoderRegistry.");
}
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof ValueStateSpec)) {
return false;
}
ValueStateSpec<?> that = (ValueStateSpec<?>) obj;
return Objects.equals(this.coder, that.coder);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), coder);
}
}
/**
* A specification for a state cell that is combined according to a {@link CombineFn}.
*
* <p>Includes the {@link CombineFn} and the coder for the accumulator type.
*/
private static class CombiningStateSpec<InputT, AccumT, OutputT>
implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
@Nullable private Coder<AccumT> accumCoder;
private final CombineFn<InputT, AccumT, OutputT> combineFn;
private CombiningStateSpec(
@Nullable Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
this.combineFn = combineFn;
this.accumCoder = accumCoder;
}
@Override
public CombiningState<InputT, AccumT, OutputT> bind(String id, StateBinder visitor) {
return visitor.bindCombining(id, this, accumCoder, combineFn);
}
@Override
public <ResultT> ResultT match(Cases<ResultT> cases) {
return cases.dispatchCombining(combineFn, accumCoder);
}
@SuppressWarnings("unchecked")
@Override
public void offerCoders(Coder[] coders) {
if (this.accumCoder == null && coders[1] != null) {
this.accumCoder = (Coder<AccumT>) coders[1];
}
}
@Override
public void finishSpecifying() {
if (accumCoder == null) {
throw new IllegalStateException(
"Unable to infer a coder for"
+ " CombiningState and no Coder was specified."
+ " Please set a coder by either invoking"
+ " StateSpecs.combining(Coder<AccumT> accumCoder,"
+ " CombineFn<InputT, AccumT, OutputT> combineFn)"
+ " or by registering the coder in the Pipeline's CoderRegistry.");
}
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof CombiningStateSpec)) {
return false;
}
CombiningStateSpec<?, ?, ?> that = (CombiningStateSpec<?, ?, ?>) obj;
return Objects.equals(this.accumCoder, that.accumCoder);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), accumCoder);
}
private StateSpec<BagState<AccumT>> asBagSpec() {
return new BagStateSpec<>(accumCoder);
}
}
/**
* A specification for a state cell that is combined according to a {@link CombineFnWithContext}.
*
* <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
*/
private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
implements StateSpec<CombiningState<InputT, AccumT, OutputT>> {
@Nullable private Coder<AccumT> accumCoder;
private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
private CombiningWithContextStateSpec(
@Nullable Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
this.combineFn = combineFn;
this.accumCoder = accumCoder;
}
@Override
public CombiningState<InputT, AccumT, OutputT> bind(String id, StateBinder visitor) {
return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
}
@Override
public <ResultT> ResultT match(Cases<ResultT> cases) {
throw new UnsupportedOperationException(
String.format(
"%s is for internal use only and does not support case dispatch",
getClass().getSimpleName()));
}
@SuppressWarnings("unchecked")
@Override
public void offerCoders(Coder[] coders) {
if (this.accumCoder == null && coders[2] != null) {
this.accumCoder = (Coder<AccumT>) coders[2];
}
}
@Override
public void finishSpecifying() {
if (accumCoder == null) {
throw new IllegalStateException(
"Unable to infer a coder for"
+ " CombiningWithContextState and no Coder was specified."
+ " Please set a coder by either invoking"
+ " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder,"
+ " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)"
+ " or by registering the coder in the Pipeline's CoderRegistry.");
}
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof CombiningWithContextStateSpec)) {
return false;
}
CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj;
return Objects.equals(this.accumCoder, that.accumCoder);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), accumCoder);
}
private StateSpec<BagState<AccumT>> asBagSpec() {
return new BagStateSpec<>(accumCoder);
}
}
/**
* A specification for a state cell supporting for bag-like access patterns (frequent additions,
* occasional reads of all the values).
*
* <p>Includes the coder for the element type {@code T}
*/
private static class BagStateSpec<T> implements StateSpec<BagState<T>> {
@Nullable private Coder<T> elemCoder;
private BagStateSpec(@Nullable Coder<T> elemCoder) {
this.elemCoder = elemCoder;
}
@Override
public BagState<T> bind(String id, StateBinder visitor) {
return visitor.bindBag(id, this, elemCoder);
}
@Override
public <ResultT> ResultT match(Cases<ResultT> cases) {
return cases.dispatchBag(elemCoder);
}
@SuppressWarnings("unchecked")
@Override
public void offerCoders(Coder[] coders) {
if (this.elemCoder == null && coders[0] != null) {
this.elemCoder = (Coder<T>) coders[0];
}
}
@Override
public void finishSpecifying() {
if (elemCoder == null) {
throw new IllegalStateException(
"Unable to infer a coder for BagState and no Coder"
+ " was specified. Please set a coder by either invoking"
+ " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder in the"
+ " Pipeline's CoderRegistry.");
}
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof BagStateSpec)) {
return false;
}
BagStateSpec<?> that = (BagStateSpec<?>) obj;
return Objects.equals(this.elemCoder, that.elemCoder);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), elemCoder);
}
}
private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> {
@Nullable private Coder<K> keyCoder;
@Nullable private Coder<V> valueCoder;
private MapStateSpec(@Nullable Coder<K> keyCoder, @Nullable Coder<V> valueCoder) {
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
}
@Override
public MapState<K, V> bind(String id, StateBinder visitor) {
return visitor.bindMap(id, this, keyCoder, valueCoder);
}
@Override
public <ResultT> ResultT match(Cases<ResultT> cases) {
return cases.dispatchMap(keyCoder, valueCoder);
}
@SuppressWarnings("unchecked")
@Override
public void offerCoders(Coder[] coders) {
if (this.keyCoder == null && coders[0] != null) {
this.keyCoder = (Coder<K>) coders[0];
}
if (this.valueCoder == null && coders[1] != null) {
this.valueCoder = (Coder<V>) coders[1];
}
}
@Override
public void finishSpecifying() {
if (keyCoder == null || valueCoder == null) {
throw new IllegalStateException(
"Unable to infer a coder for MapState and no Coder"
+ " was specified. Please set a coder by either invoking"
+ " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by registering the"
+ " coder in the Pipeline's CoderRegistry.");
}
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof MapStateSpec)) {
return false;
}
MapStateSpec<?, ?> that = (MapStateSpec<?, ?>) obj;
return Objects.equals(this.keyCoder, that.keyCoder)
&& Objects.equals(this.valueCoder, that.valueCoder);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), keyCoder, valueCoder);
}
}
/**
* A specification for a state cell supporting for set-like access patterns.
*
* <p>Includes the coder for the element type {@code T}
*/
private static class SetStateSpec<T> implements StateSpec<SetState<T>> {
@Nullable private Coder<T> elemCoder;
private SetStateSpec(@Nullable Coder<T> elemCoder) {
this.elemCoder = elemCoder;
}
@Override
public SetState<T> bind(String id, StateBinder visitor) {
return visitor.bindSet(id, this, elemCoder);
}
@Override
public <ResultT> ResultT match(Cases<ResultT> cases) {
return cases.dispatchSet(elemCoder);
}
@SuppressWarnings("unchecked")
@Override
public void offerCoders(Coder[] coders) {
if (this.elemCoder == null && coders[0] != null) {
this.elemCoder = (Coder<T>) coders[0];
}
}
@Override
public void finishSpecifying() {
if (elemCoder == null) {
throw new IllegalStateException(
"Unable to infer a coder for SetState and no Coder"
+ " was specified. Please set a coder by either invoking"
+ " StateSpecs.set(Coder<T> elemCoder) or by registering the coder in the"
+ " Pipeline's CoderRegistry.");
}
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof SetStateSpec)) {
return false;
}
SetStateSpec<?> that = (SetStateSpec<?>) obj;
return Objects.equals(this.elemCoder, that.elemCoder);
}
@Override
public int hashCode() {
return Objects.hash(getClass(), elemCoder);
}
}
/**
* A specification for a state cell tracking a combined watermark hold.
*
* <p>Includes the {@link TimestampCombiner} according to which the output times are combined.
*/
private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> {
/**
* When multiple output times are added to hold the watermark, this determines how they are
* combined, and also the behavior when merging windows. Does not contribute to equality/hash
* since we have at most one watermark hold spec per computation.
*/
private final TimestampCombiner timestampCombiner;
private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
this.timestampCombiner = timestampCombiner;
}
@Override
public WatermarkHoldState bind(String id, StateBinder visitor) {
return visitor.bindWatermark(id, this, timestampCombiner);
}
@Override
public <ResultT> ResultT match(Cases<ResultT> cases) {
throw new UnsupportedOperationException(
String.format(
"%s is for internal use only and does not support case dispatch",
getClass().getSimpleName()));
}
@Override
public void offerCoders(Coder[] coders) {}
@Override
public void finishSpecifying() {
// Currently an empty implementation as there are no coders to validate.
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
// All instance of WatermarkHoldState are considered equal
return obj instanceof WatermarkStateSpecInternal;
}
@Override
public int hashCode() {
return Objects.hash(getClass());
}
}
}