blob: 8dd84c17f9584f65bb6b52deb659a1cb6a24950a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateBinder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Equivalence;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
/** Static utility methods for creating {@link StateTag} instances. */
@Experimental(Kind.STATE)
public class StateTags {
private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault();
public static final Equivalence<StateTag> ID_EQUIVALENCE =
new Equivalence<StateTag>() {
@Override
protected boolean doEquivalent(StateTag a, StateTag b) {
return a.getId().equals(b.getId());
}
@Override
protected int doHash(StateTag stateTag) {
return stateTag.getId().hashCode();
}
};
/** @deprecated for migration purposes only */
@Deprecated
private static StateBinder adaptTagBinder(final StateTag.StateBinder binder) {
return new StateBinder() {
@Override
public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
return binder.bindValue(tagForSpec(id, spec), coder);
}
@Override
public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
return binder.bindBag(tagForSpec(id, spec), elemCoder);
}
@Override
public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
return binder.bindSet(tagForSpec(id, spec), elemCoder);
}
@Override
public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
String id,
StateSpec<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
return binder.bindMap(tagForSpec(id, spec), mapKeyCoder, mapValueCoder);
}
@Override
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(
String id,
StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
}
@Override
public <InputT, AccumT, OutputT>
CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
String id,
StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return binder.bindCombiningValueWithContext(tagForSpec(id, spec), accumCoder, combineFn);
}
@Override
public WatermarkHoldState bindWatermark(
String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);
}
};
}
private enum StateKind {
SYSTEM('s'),
USER('u');
private final char prefix;
StateKind(char prefix) {
this.prefix = prefix;
}
}
private StateTags() {}
private interface SystemStateTag<StateT extends State> {
StateTag<StateT> asKind(StateKind kind);
}
/** Create a state tag for the given id and spec. */
public static <StateT extends State> StateTag<StateT> tagForSpec(
String id, StateSpec<StateT> spec) {
return new SimpleStateTag<>(new StructuredId(id), spec);
}
/** Create a simple state tag for values of type {@code T}. */
public static <T> StateTag<ValueState<T>> value(String id, Coder<T> valueCoder) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder));
}
/**
* Create a state tag for values that use a {@link CombineFn} to automatically merge multiple
* {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValue(
String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
}
/**
* Create a state tag for values that use a {@link CombineFnWithContext} to automatically merge
* multiple {@code InputT}s into a single {@code OutputT}.
*/
public static <InputT, AccumT, OutputT>
StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValueWithContext(
String id,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
}
/**
* Create a state tag for values that use a {@link CombineFn} to automatically merge multiple
* {@code InputT}s into a single {@code OutputT}.
*
* <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should
* only be used to initialize static values.
*/
public static <InputT, AccumT, OutputT>
StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValueFromInputInternal(
String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
return new SimpleStateTag<>(
new StructuredId(id), StateSpecs.combiningFromInputInternal(inputCoder, combineFn));
}
/**
* Create a state tag that is optimized for adding values frequently, and occasionally retrieving
* all the values that have been added.
*/
public static <T> StateTag<BagState<T>> bag(String id, Coder<T> elemCoder) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder));
}
/** Create a state spec that supporting for {@link java.util.Set} like access patterns. */
public static <T> StateTag<SetState<T>> set(String id, Coder<T> elemCoder) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.set(elemCoder));
}
/** Create a state spec that supporting for {@link java.util.Map} like access patterns. */
public static <K, V> StateTag<MapState<K, V>> map(
String id, Coder<K> keyCoder, Coder<V> valueCoder) {
return new SimpleStateTag<>(new StructuredId(id), StateSpecs.map(keyCoder, valueCoder));
}
/** Create a state tag for holding the watermark. */
public static <W extends BoundedWindow> StateTag<WatermarkHoldState> watermarkStateInternal(
String id, TimestampCombiner timestampCombiner) {
return new SimpleStateTag<>(
new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));
}
/**
* Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to
* collide with any user tags.
*/
public static <StateT extends State> StateTag<StateT> makeSystemTagInternal(
StateTag<StateT> tag) {
if (!(tag instanceof SystemStateTag)) {
throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag);
}
// Checked above
@SuppressWarnings("unchecked")
SystemStateTag<StateT> typedTag = (SystemStateTag<StateT>) tag;
return typedTag.asKind(StateKind.SYSTEM);
}
public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>> convertToBagTagInternal(
StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
return new SimpleStateTag<>(
new StructuredId(combiningTag.getId()),
StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
}
private static class StructuredId implements Serializable {
private final StateKind kind;
private final String rawId;
private StructuredId(String rawId) {
this(StateKind.USER, rawId);
}
private StructuredId(StateKind kind, String rawId) {
this.kind = kind;
this.rawId = rawId;
}
public StructuredId asKind(StateKind kind) {
return new StructuredId(kind, rawId);
}
public void appendTo(Appendable sb) throws IOException {
sb.append(kind.prefix).append(rawId);
}
public String getRawId() {
return rawId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass()).add("id", rawId).add("kind", kind).toString();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof StructuredId)) {
return false;
}
StructuredId that = (StructuredId) obj;
return Objects.equals(this.kind, that.kind) && Objects.equals(this.rawId, that.rawId);
}
@Override
public int hashCode() {
return Objects.hash(kind, rawId);
}
}
/** A basic {@link StateTag} implementation that manages the structured ids. */
private static class SimpleStateTag<StateT extends State>
implements StateTag<StateT>, SystemStateTag<StateT> {
private final StateSpec<StateT> spec;
private final StructuredId id;
public SimpleStateTag(StructuredId id, StateSpec<StateT> spec) {
this.id = id;
this.spec = spec;
}
/** @deprecated use {@link StateSpec#bind} method via {@link #getSpec} for now. */
@Override
@Deprecated
public StateT bind(StateTag.StateBinder binder) {
return spec.bind(this.id.getRawId(), adaptTagBinder(binder));
}
@Override
public String getId() {
return id.getRawId();
}
@Override
public StateSpec<StateT> getSpec() {
return spec;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass()).add("id", id).toString();
}
@Override
public void appendTo(Appendable sb) throws IOException {
id.appendTo(sb);
}
@Override
public StateTag<StateT> asKind(StateKind kind) {
return new SimpleStateTag<>(id.asKind(kind), spec);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof SimpleStateTag)) {
return false;
}
SimpleStateTag<?> otherTag = (SimpleStateTag<?>) other;
return Objects.equals(this.getId(), otherTag.getId());
}
@Override
public int hashCode() {
return Objects.hash(getClass(), this.getId());
}
}
}